Compare commits

..

8 Commits

Author SHA1 Message Date
yongkangc
eb570d658c fix: start static file tests from block 0, remove unused imports
Amp-Thread-ID: https://ampcode.com/threads/T-019be7bf-0e67-770d-9c38-45c6b5b286ae
2026-01-22 22:23:58 +00:00
yongkangc
8b9a644853 feat(prune): support pruning history from static files 2026-01-22 22:20:37 +00:00
yongkangc
165b362aaf feat(prune): add tests for RocksDB history pruners and invariants healing
- Add comprehensive test suite for AccountsHistoryPruner (4 tests)
- Add comprehensive test suite for StoragesHistoryPruner (4 tests)
- Add changeset-based healing tests for invariants.rs (3 tests)
- Refactor StoragesHistoryPruner to use StorageChangeSetReader trait
  abstraction instead of explicit static file/MDBX logic
- Fix clippy warnings in storage history pruner

The pruners now use trait abstractions (ChangeSetReader for accounts,
StorageChangeSetReader for storage) which handle static file vs MDBX
routing internally, making the pruner code simpler and more consistent.

Closes #20417

Amp-Thread-ID: https://ampcode.com/threads/T-019be7b1-7664-759f-9590-46716d320658
2026-01-22 22:06:04 +00:00
yongkangc
2171624e28 fix: correct checkpoint logic in RocksDB history pruners
- Remove saturating_sub(1) that caused keys in last scanned block to be skipped
- Move limiter check after block processing to capture completed work
- Remove redundant limiter check at loop start in storage pruner
- Trim verbose module docs

Amp-Thread-ID: https://ampcode.com/threads/T-019be770-1374-70a9-8066-f108a1dd15df
2026-01-22 21:17:31 +00:00
yongkangc
502d18a0b8 chore(prune): clean up verbose comments in RocksDB history pruners 2026-01-22 21:17:31 +00:00
yongkangc
93160ff97f refactor(prune): use ChangeSetReader for RocksDB history pruners
Update AccountsHistoryPruner and StoragesHistoryPruner to read changesets
via ChangeSetReader/StorageChangeSetReader traits instead of directly
iterating MDBX tables. This allows the pruners to work regardless of
whether changesets are stored in MDBX or static files.

Key changes:
- Replace DBProvider<Tx: DbTxMut> bounds with ChangeSetReader/StorageChangeSetReader
- Use account_block_changeset()/storage_block_changeset() to iterate changesets
- Remove MDBX changeset deletion (handled by regular history pruner segments)
- Keep existing checkpoint and limiter logic

Closes #20417
2026-01-22 21:17:31 +00:00
yongkangc
e913be192c review: address feedback on RocksDB history pruner
- Add atomicity limitation documentation to segment module docs
- Deduplicate prune_*_history_to with shared helper function
- Add 8 more storage history equivalence tests (parity with account)
- Include updated_shards in pruned count
- Inline to_block variable
- Remove redundant inner cfg attributes
2026-01-22 21:17:31 +00:00
yongkangc
299de4a5f9 feat(prune): add RocksDB history indices pruner segments
Add AccountsHistoryPruner and StoragesHistoryPruner segments for pruning
history indices stored in RocksDB. These segments use changeset-driven
pruning: they prune MDBX changesets first to determine which keys need
RocksDB history pruning, then prune RocksDB shards for those keys.

Key changes:
- Add prune_account_history_to and prune_storage_history_to to RocksDBBatch
- Add PruneShardOutcome enum (Deleted, Updated, Unchanged)
- Add AccountsHistoryPruner and StoragesHistoryPruner segments
- Feature-gated with #[cfg(all(unix, feature = "rocksdb"))]
- Add 21 tests covering edge cases and MDBX equivalence

The implementation maintains the sentinel shard invariant (last shard
has u64::MAX key) and produces logically equivalent results to MDBX
prune_shard.

Closes #21292

Amp-Thread-ID: https://ampcode.com/threads/T-019be66d-e360-712b-8f84-af0142f8a44b
2026-01-22 21:17:31 +00:00
162 changed files with 5427 additions and 6045 deletions

2
.github/CODEOWNERS vendored
View File

@@ -37,7 +37,7 @@ crates/storage/db/ @joshieDo
crates/storage/errors/ @joshieDo
crates/storage/libmdbx-rs/ @shekhirin
crates/storage/nippy-jar/ @joshieDo @shekhirin
crates/storage/provider/ @joshieDo @shekhirin @yongkangc
crates/storage/provider/ @joshieDo @shekhirin
crates/storage/storage-api/ @joshieDo
crates/tasks/ @mattsse
crates/tokio-util/ @mattsse

54
.github/workflows/docker-git.yml vendored Normal file
View File

@@ -0,0 +1,54 @@
# Publishes the Docker image, only to be used with `workflow_dispatch`. The
# images from this workflow will be tagged with the git sha of the branch used
# and will NOT tag it as `latest`.
name: docker-git
on:
workflow_dispatch: {}
env:
REPO_NAME: ${{ github.repository_owner }}/reth
IMAGE_NAME: ${{ github.repository_owner }}/reth
OP_IMAGE_NAME: ${{ github.repository_owner }}/op-reth
CARGO_TERM_COLOR: always
DOCKER_IMAGE_NAME: ghcr.io/${{ github.repository_owner }}/reth
OP_DOCKER_IMAGE_NAME: ghcr.io/${{ github.repository_owner }}/op-reth
DOCKER_USERNAME: ${{ github.actor }}
GIT_SHA: ${{ github.sha }}
jobs:
build:
name: build and push
runs-on: ubuntu-24.04
permissions:
packages: write
contents: read
strategy:
fail-fast: false
matrix:
build:
- name: 'Build and push the git-sha-tagged reth image'
command: 'make PROFILE=maxperf GIT_SHA=$GIT_SHA docker-build-push-git-sha'
- name: 'Build and push the git-sha-tagged op-reth image'
command: 'make IMAGE_NAME=$OP_IMAGE_NAME DOCKER_IMAGE_NAME=$OP_DOCKER_IMAGE_NAME GIT_SHA=$GIT_SHA PROFILE=maxperf op-docker-build-push-git-sha'
steps:
- uses: actions/checkout@v6
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
- name: Install cross main
id: cross_main
run: |
cargo install cross --git https://github.com/cross-rs/cross
- name: Log in to Docker
run: |
echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io --username ${DOCKER_USERNAME} --password-stdin
- name: Set up Docker builder
run: |
docker run --privileged --rm tonistiigi/binfmt --install arm64,amd64
docker buildx create --use --name cross-builder
- name: Build and push ${{ matrix.build.name }}
run: ${{ matrix.build.command }}

65
.github/workflows/docker-nightly.yml vendored Normal file
View File

@@ -0,0 +1,65 @@
# Publishes the nightly Docker image.
name: docker-nightly
on:
workflow_dispatch:
schedule:
- cron: "0 1 * * *"
env:
REPO_NAME: ${{ github.repository_owner }}/reth
IMAGE_NAME: ${{ github.repository_owner }}/reth
OP_IMAGE_NAME: ${{ github.repository_owner }}/op-reth
CARGO_TERM_COLOR: always
DOCKER_IMAGE_NAME: ghcr.io/${{ github.repository_owner }}/reth
OP_DOCKER_IMAGE_NAME: ghcr.io/${{ github.repository_owner }}/op-reth
DOCKER_USERNAME: ${{ github.actor }}
jobs:
build:
name: build and push
runs-on: ubuntu-24.04
permissions:
packages: write
contents: read
strategy:
fail-fast: false
matrix:
build:
- name: 'Build and push the nightly reth image'
command: 'make PROFILE=maxperf docker-build-push-nightly'
- name: 'Build and push the nightly edge profiling reth image'
command: 'make PROFILE=profiling docker-build-push-nightly-edge-profiling'
- name: 'Build and push the nightly profiling reth image'
command: 'make PROFILE=profiling docker-build-push-nightly-profiling'
- name: 'Build and push the nightly op-reth image'
command: 'make IMAGE_NAME=$OP_IMAGE_NAME DOCKER_IMAGE_NAME=$OP_DOCKER_IMAGE_NAME PROFILE=maxperf op-docker-build-push-nightly'
- name: 'Build and push the nightly edge profiling op-reth image'
command: 'make IMAGE_NAME=$OP_IMAGE_NAME DOCKER_IMAGE_NAME=$OP_DOCKER_IMAGE_NAME PROFILE=profiling op-docker-build-push-nightly-edge-profiling'
- name: 'Build and push the nightly profiling op-reth image'
command: 'make IMAGE_NAME=$OP_IMAGE_NAME DOCKER_IMAGE_NAME=$OP_DOCKER_IMAGE_NAME PROFILE=profiling op-docker-build-push-nightly-profiling'
steps:
- uses: actions/checkout@v6
- name: Remove bloatware
uses: laverdet/remove-bloatware@v1.0.0
with:
docker: true
lang: rust
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
- name: Install cross main
id: cross_main
run: |
cargo install cross --git https://github.com/cross-rs/cross
- name: Log in to Docker
run: |
echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io --username ${DOCKER_USERNAME} --password-stdin
- name: Set up Docker builder
run: |
docker run --privileged --rm tonistiigi/binfmt --install arm64,amd64
docker buildx create --use --name cross-builder
- name: Build and push ${{ matrix.build.name }}
run: ${{ matrix.build.command }}

View File

@@ -1,9 +1,4 @@
# Publishes Docker images.
#
# Triggers:
# - Push tag v*: builds release (RC or latest)
# - Schedule: builds nightly + profiling
# - Manual: builds git-sha or nightly
# Publishes the Docker image.
name: docker
@@ -11,94 +6,84 @@ on:
push:
tags:
- v*
schedule:
- cron: "0 1 * * *"
workflow_dispatch:
inputs:
build_type:
description: "Build type"
required: true
type: choice
options:
- git-sha
- nightly
default: git-sha
dry_run:
description: "Skip pushing images (dry run)"
required: false
type: boolean
default: false
env:
IMAGE_NAME: ${{ github.repository_owner }}/reth
OP_IMAGE_NAME: ${{ github.repository_owner }}/op-reth
CARGO_TERM_COLOR: always
DOCKER_IMAGE_NAME: ghcr.io/${{ github.repository_owner }}/reth
OP_DOCKER_IMAGE_NAME: ghcr.io/${{ github.repository_owner }}/op-reth
DOCKER_USERNAME: ${{ github.actor }}
jobs:
build:
name: Build Docker images
build-rc:
if: contains(github.ref, '-rc')
name: build and push as release candidate
runs-on: ubuntu-24.04
permissions:
packages: write
contents: read
id-token: write
strategy:
fail-fast: false
matrix:
build:
- name: "Build and push reth image"
command: "make IMAGE_NAME=$IMAGE_NAME DOCKER_IMAGE_NAME=$DOCKER_IMAGE_NAME PROFILE=maxperf docker-build-push"
- name: "Build and push op-reth image"
command: "make IMAGE_NAME=$OP_IMAGE_NAME DOCKER_IMAGE_NAME=$OP_DOCKER_IMAGE_NAME PROFILE=maxperf op-docker-build-push"
steps:
- uses: actions/checkout@v6
- name: Set up Depot CLI
uses: depot/setup-action@v1
- name: Log in to GHCR
uses: docker/login-action@v3
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Get git info for vergen
id: git
cache-on-failure: true
- name: Install cross main
id: cross_main
run: |
echo "sha=${{ github.sha }}" >> "$GITHUB_OUTPUT"
echo "describe=$(git describe --always --tags)" >> "$GITHUB_OUTPUT"
echo "dirty=false" >> "$GITHUB_OUTPUT"
- name: Determine build parameters
id: params
cargo install cross --git https://github.com/cross-rs/cross
- name: Log in to Docker
run: |
REGISTRY="ghcr.io/${{ github.repository_owner }}"
echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io --username ${DOCKER_USERNAME} --password-stdin
- name: Set up Docker builder
run: |
docker run --privileged --rm tonistiigi/binfmt --install arm64,amd64
docker buildx create --use --name cross-builder
- name: Build and push ${{ matrix.build.name }}
run: ${{ matrix.build.command }}
if [[ "${{ github.event_name }}" == "push" ]]; then
VERSION="${GITHUB_REF#refs/tags/}"
echo "targets=ethereum optimism" >> "$GITHUB_OUTPUT"
# Add 'latest' tag for non-RC releases
if [[ ! "$VERSION" =~ -rc ]]; then
echo "ethereum_tags=${REGISTRY}/reth:${VERSION},${REGISTRY}/reth:latest" >> "$GITHUB_OUTPUT"
echo "optimism_tags=${REGISTRY}/op-reth:${VERSION},${REGISTRY}/op-reth:latest" >> "$GITHUB_OUTPUT"
else
echo "ethereum_tags=${REGISTRY}/reth:${VERSION}" >> "$GITHUB_OUTPUT"
echo "optimism_tags=${REGISTRY}/op-reth:${VERSION}" >> "$GITHUB_OUTPUT"
fi
elif [[ "${{ github.event_name }}" == "schedule" ]] || [[ "${{ inputs.build_type }}" == "nightly" ]]; then
echo "targets=nightly" >> "$GITHUB_OUTPUT"
echo "ethereum_tags=${REGISTRY}/reth:nightly" >> "$GITHUB_OUTPUT"
echo "optimism_tags=${REGISTRY}/op-reth:nightly" >> "$GITHUB_OUTPUT"
else
# git-sha build
echo "targets=ethereum optimism" >> "$GITHUB_OUTPUT"
echo "ethereum_tags=${REGISTRY}/reth:${{ github.sha }}" >> "$GITHUB_OUTPUT"
echo "optimism_tags=${REGISTRY}/op-reth:${{ github.sha }}" >> "$GITHUB_OUTPUT"
fi
- name: Build and push images
uses: depot/bake-action@v1
env:
VERGEN_GIT_SHA: ${{ steps.git.outputs.sha }}
VERGEN_GIT_DESCRIBE: ${{ steps.git.outputs.describe }}
VERGEN_GIT_DIRTY: ${{ steps.git.outputs.dirty }}
DEPOT_TOKEN: ${{ secrets.DEPOT_TOKEN }}
build:
if: ${{ !contains(github.ref, '-rc') }}
name: build and push as latest
runs-on: ubuntu-24.04
permissions:
packages: write
contents: read
strategy:
fail-fast: false
matrix:
build:
- name: "Build and push reth image"
command: "make IMAGE_NAME=$IMAGE_NAME DOCKER_IMAGE_NAME=$DOCKER_IMAGE_NAME PROFILE=maxperf docker-build-push-latest"
- name: "Build and push op-reth image"
command: "make IMAGE_NAME=$OP_IMAGE_NAME DOCKER_IMAGE_NAME=$OP_DOCKER_IMAGE_NAME PROFILE=maxperf op-docker-build-push-latest"
steps:
- uses: actions/checkout@v6
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
with:
project: ${{ vars.DEPOT_PROJECT_ID }}
files: docker-bake.hcl
targets: ${{ steps.params.outputs.targets }}
push: ${{ !(github.event_name == 'workflow_dispatch' && inputs.dry_run) }}
set: |
ethereum.tags=${{ steps.params.outputs.ethereum_tags }}
optimism.tags=${{ steps.params.outputs.optimism_tags }}
cache-on-failure: true
- name: Install cross main
id: cross_main
run: |
cargo install cross --git https://github.com/cross-rs/cross
- name: Log in to Docker
run: |
echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io --username ${DOCKER_USERNAME} --password-stdin
- name: Set up Docker builder
run: |
docker run --privileged --rm tonistiigi/binfmt --install arm64,amd64
docker buildx create --use --name cross-builder
- name: Build and push ${{ matrix.build.name }}
run: ${{ matrix.build.command }}

182
Cargo.lock generated
View File

@@ -106,9 +106,9 @@ checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923"
[[package]]
name = "alloy-chains"
version = "0.2.30"
version = "0.2.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90f374d3c6d729268bbe2d0e0ff992bb97898b2df756691a62ee1d5f0506bc39"
checksum = "3842d8c52fcd3378039f4703dba392dca8b546b1c8ed6183048f8dab95b2be78"
dependencies = [
"alloy-primitives",
"alloy-rlp",
@@ -249,9 +249,9 @@ dependencies = [
[[package]]
name = "alloy-eip7928"
version = "0.3.2"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3231de68d5d6e75332b7489cfcc7f4dfabeba94d990a10e4b923af0e6623540"
checksum = "6adac476434bf024279164dcdca299309f0c7d1e3557024eb7a83f8d9d01c6b5"
dependencies = [
"alloy-primitives",
"alloy-rlp",
@@ -495,7 +495,7 @@ dependencies = [
"async-stream",
"async-trait",
"auto_impl",
"dashmap",
"dashmap 6.1.0",
"either",
"futures",
"futures-utils-wasm",
@@ -1774,7 +1774,7 @@ dependencies = [
"bytemuck",
"cfg-if",
"cow-utils",
"dashmap",
"dashmap 6.1.0",
"dynify",
"fast-float2",
"float16",
@@ -1968,6 +1968,12 @@ version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7575182f7272186991736b70173b0ea045398f984bf5ebbb3804736ce1330c9d"
[[package]]
name = "bytecount"
version = "0.6.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "175812e0be2bccb6abe50bb8d566126198344f707e304f45c648fd8f2cc0365e"
[[package]]
name = "bytemuck"
version = "1.24.0"
@@ -2057,6 +2063,19 @@ dependencies = [
"serde_core",
]
[[package]]
name = "cargo_metadata"
version = "0.14.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa"
dependencies = [
"camino",
"cargo-platform 0.1.9",
"semver 1.0.27",
"serde",
"serde_json",
]
[[package]]
name = "cargo_metadata"
version = "0.19.2"
@@ -2108,9 +2127,9 @@ dependencies = [
[[package]]
name = "cc"
version = "1.2.54"
version = "1.2.53"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6354c81bbfd62d9cfa9cb3c773c2b7b2a3a482d569de977fd0e961f6e7c00583"
checksum = "755d2fce177175ffca841e9a06afdb2c4ab0f593d53b4dee48147dfaade85932"
dependencies = [
"find-msvc-tools",
"jobserver",
@@ -2911,6 +2930,19 @@ dependencies = [
"syn 2.0.114",
]
[[package]]
name = "dashmap"
version = "5.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
dependencies = [
"cfg-if",
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]]
name = "dashmap"
version = "6.1.0"
@@ -3463,6 +3495,15 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "error-chain"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc"
dependencies = [
"version_check",
]
[[package]]
name = "ethereum_hashing"
version = "0.7.0"
@@ -4041,12 +4082,11 @@ checksum = "8591b0bcc8a98a64310a2fae1bb3e9b8564dd10e381e6e28010fde8e8e8568db"
[[package]]
name = "fixed-cache"
version = "0.1.7"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0aaafa7294e9617eb29e5c684a3af33324ef512a1bf596af2d1938a03798da29"
checksum = "25d3af83468398d500e9bc19e001812dcb1a11e4d3d6a5956c789aa3c11a8cb5"
dependencies = [
"equivalent",
"typeid",
]
[[package]]
@@ -4811,7 +4851,7 @@ dependencies = [
"libc",
"percent-encoding",
"pin-project-lite",
"socket2 0.6.2",
"socket2 0.6.1",
"tokio",
"tower-service",
"tracing",
@@ -5553,9 +5593,9 @@ dependencies = [
[[package]]
name = "libm"
version = "0.2.16"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981"
checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de"
[[package]]
name = "libp2p-identity"
@@ -5927,6 +5967,21 @@ dependencies = [
"unicase",
]
[[package]]
name = "mini-moka"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c325dfab65f261f386debee8b0969da215b3fa0037e74c8a1234db7ba986d803"
dependencies = [
"crossbeam-channel",
"crossbeam-utils",
"dashmap 5.5.3",
"skeptic",
"smallvec",
"tagptr",
"triomphe",
]
[[package]]
name = "minimal-lexical"
version = "0.2.1"
@@ -6153,9 +6208,9 @@ dependencies = [
[[package]]
name = "num-conv"
version = "0.2.0"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050"
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
[[package]]
name = "num-integer"
@@ -6460,9 +6515,9 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381"
[[package]]
name = "openssl-probe"
version = "0.2.1"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe"
checksum = "9f50d9b3dabb09ecd771ad0aa242ca6894994c130308ca3d7684634df8037391"
[[package]]
name = "opentelemetry"
@@ -6967,9 +7022,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.106"
version = "1.0.105"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934"
checksum = "535d180e0ecab6268a3e718bb9fd44db66bbbc256257165fc699dadf70d16fe7"
dependencies = [
"unicode-ident",
]
@@ -7117,6 +7172,17 @@ dependencies = [
"syn 2.0.114",
]
[[package]]
name = "pulldown-cmark"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57206b407293d2bcd3af849ce869d52068623f19e1b5ff8e8778e3309439682b"
dependencies = [
"bitflags 2.10.0",
"memchr",
"unicase",
]
[[package]]
name = "quanta"
version = "0.12.6"
@@ -7160,7 +7226,7 @@ dependencies = [
"quinn-udp",
"rustc-hash",
"rustls",
"socket2 0.6.2",
"socket2 0.6.1",
"thiserror 2.0.18",
"tokio",
"tracing",
@@ -7197,16 +7263,16 @@ dependencies = [
"cfg_aliases",
"libc",
"once_cell",
"socket2 0.6.2",
"socket2 0.6.1",
"tracing",
"windows-sys 0.60.2",
]
[[package]]
name = "quote"
version = "1.0.44"
version = "1.0.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21b2ebcf727b7760c461f091f9f0f539b77b8e87f2fd88131e7f1b433b3cece4"
checksum = "dc74d9a594b72ae6656596548f56f667211f8a97b3d4c3d467150794690dc40a"
dependencies = [
"proc-macro2",
]
@@ -8399,13 +8465,13 @@ dependencies = [
"assert_matches",
"codspeed-criterion-compat",
"crossbeam-channel",
"dashmap",
"dashmap 6.1.0",
"derive_more",
"eyre",
"fixed-cache",
"futures",
"metrics",
"metrics-util",
"mini-moka",
"moka",
"parking_lot",
"proptest",
@@ -9045,7 +9111,7 @@ dependencies = [
"bitflags 2.10.0",
"byteorder",
"codspeed-criterion-compat",
"dashmap",
"dashmap 6.1.0",
"derive_more",
"parking_lot",
"rand 0.9.2",
@@ -9130,7 +9196,6 @@ dependencies = [
"reth-eth-wire-types",
"reth-ethereum-forks",
"reth-ethereum-primitives",
"reth-evm-ethereum",
"reth-fs-util",
"reth-metrics",
"reth-net-banlist",
@@ -9998,7 +10063,6 @@ dependencies = [
"parking_lot",
"reth-chain-state",
"reth-chainspec",
"reth-evm",
"reth-metrics",
"reth-optimism-chainspec",
"reth-optimism-evm",
@@ -10154,7 +10218,7 @@ dependencies = [
"alloy-primitives",
"alloy-rpc-types-engine",
"assert_matches",
"dashmap",
"dashmap 6.1.0",
"eyre",
"itertools 0.14.0",
"metrics",
@@ -11054,8 +11118,6 @@ dependencies = [
"reth-chainspec",
"reth-eth-wire-types",
"reth-ethereum-primitives",
"reth-evm",
"reth-evm-ethereum",
"reth-execution-types",
"reth-fs-util",
"reth-metrics",
@@ -11064,7 +11126,6 @@ dependencies = [
"reth-storage-api",
"reth-tasks",
"reth-tracing",
"revm",
"revm-interpreter",
"revm-primitives",
"rustc-hash",
@@ -11185,7 +11246,7 @@ dependencies = [
"alloy-rlp",
"codspeed-criterion-compat",
"crossbeam-channel",
"dashmap",
"dashmap 6.1.0",
"derive_more",
"itertools 0.14.0",
"metrics",
@@ -11408,9 +11469,9 @@ dependencies = [
[[package]]
name = "revm-inspectors"
version = "0.34.1"
version = "0.34.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a24ca988ae1f7a0bb5688630579c00e867cd9f1df0a2f040623887f63d3b414c"
checksum = "4a1ce3f52a052d78cc251714d57bf05dc8bc75e269677de11805d3153300a2cd"
dependencies = [
"alloy-primitives",
"alloy-rpc-types-eth",
@@ -12321,6 +12382,21 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d"
[[package]]
name = "skeptic"
version = "0.13.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8"
dependencies = [
"bytecount",
"cargo_metadata 0.14.2",
"error-chain",
"glob",
"pulldown-cmark",
"tempfile",
"walkdir",
]
[[package]]
name = "sketches-ddsketch"
version = "0.3.0"
@@ -12389,9 +12465,9 @@ dependencies = [
[[package]]
name = "socket2"
version = "0.6.2"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86f4aa3ad99f2088c990dfa82d367e19cb29268ed67c574d10d0a4bfe71f07e0"
checksum = "17129e116933cf371d018bb80ae557e889637989d8638274fb25622827b03881"
dependencies = [
"libc",
"windows-sys 0.60.2",
@@ -12792,9 +12868,9 @@ dependencies = [
[[package]]
name = "time"
version = "0.3.46"
version = "0.3.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9da98b7d9b7dad93488a84b8248efc35352b0b2657397d4167e7ad67e5d535e5"
checksum = "f9e442fc33d7fdb45aa9bfeb312c095964abdf596f7567261062b2a7107aaabd"
dependencies = [
"deranged",
"itoa",
@@ -12810,15 +12886,15 @@ dependencies = [
[[package]]
name = "time-core"
version = "0.1.8"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca"
checksum = "8b36ee98fd31ec7426d599183e8fe26932a8dc1fb76ddb6214d05493377d34ca"
[[package]]
name = "time-macros"
version = "0.2.26"
version = "0.2.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78cc610bac2dcee56805c99642447d4c5dbde4d01f752ffea0199aee1f601dc4"
checksum = "71e552d1249bf61ac2a52db88179fd0673def1e1ad8243a00d9ec9ed71fee3dd"
dependencies = [
"num-conv",
"time-core",
@@ -12881,7 +12957,7 @@ dependencies = [
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2 0.6.2",
"socket2 0.6.1",
"tokio-macros",
"windows-sys 0.61.2",
]
@@ -13361,6 +13437,12 @@ dependencies = [
"rlp",
]
[[package]]
name = "triomphe"
version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd69c5aa8f924c7519d6372789a74eac5b94fb0f8fcf0d4a97eb0bfc3e785f39"
[[package]]
name = "try-lock"
version = "0.2.5"
@@ -13386,12 +13468,6 @@ dependencies = [
"utf-8",
]
[[package]]
name = "typeid"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c"
[[package]]
name = "typenum"
version = "1.19.0"
@@ -13548,9 +13624,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "uuid"
version = "1.20.0"
version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee48d38b119b0cd71fe4141b30f5ba9c7c5d9f4e7a3a8b4a674e4b6ef789976f"
checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a"
dependencies = [
"getrandom 0.3.4",
"js-sys",

View File

@@ -481,7 +481,7 @@ revm-primitives = { version = "22.0.0", default-features = false }
revm-interpreter = { version = "32.0.0", default-features = false }
revm-database-interface = { version = "9.0.0", default-features = false }
op-revm = { version = "15.0.0", default-features = false }
revm-inspectors = "0.34.1"
revm-inspectors = "0.34.0"
# eth
alloy-chains = { version = "0.2.5", default-features = false }
@@ -588,7 +588,7 @@ tracing-appender = "0.2"
url = { version = "2.3", default-features = false }
zstd = "0.13"
byteorder = "1"
fixed-cache = { version = "0.1.7", features = ["stats"] }
mini-moka = "0.10"
moka = "0.12"
tar-no-std = { version = "0.3.2", default-features = false }
miniz_oxide = { version = "0.8.4", default-features = false }

15
Dockerfile.cross Normal file
View File

@@ -0,0 +1,15 @@
# This image is meant to enable cross-architecture builds.
# It assumes the reth binary has already been compiled for `$TARGETPLATFORM` and is
# locatable in `./dist/bin/$TARGETARCH`
FROM --platform=$TARGETPLATFORM ubuntu:22.04
LABEL org.opencontainers.image.source=https://github.com/paradigmxyz/reth
LABEL org.opencontainers.image.licenses="MIT OR Apache-2.0"
# Filled by docker buildx
ARG TARGETARCH
COPY ./dist/bin/$TARGETARCH/reth /usr/local/bin/reth
EXPOSE 30303 30303/udp 9001 8545 8546
ENTRYPOINT ["/usr/local/bin/reth"]

View File

@@ -1,99 +0,0 @@
# syntax=docker/dockerfile:1
# Unified Dockerfile for reth and op-reth, optimized for Depot builds
# Usage:
# reth: --build-arg BINARY=reth
# op-reth: --build-arg BINARY=op-reth --build-arg MANIFEST_PATH=crates/optimism/bin
FROM lukemathwalker/cargo-chef:latest-rust-1 AS chef
WORKDIR /app
LABEL org.opencontainers.image.source=https://github.com/paradigmxyz/reth
LABEL org.opencontainers.image.licenses="MIT OR Apache-2.0"
RUN apt-get update && apt-get install -y libclang-dev pkg-config
# Install sccache for compilation caching
RUN cargo install sccache --locked
ENV RUSTC_WRAPPER=sccache
ENV SCCACHE_DIR=/sccache
ENV SCCACHE_WEBDAV_ENDPOINT=https://cache.depot.dev
# Builds a cargo-chef plan
FROM chef AS planner
COPY --exclude=.git . .
RUN cargo chef prepare --recipe-path recipe.json
FROM chef AS builder
COPY --from=planner /app/recipe.json recipe.json
# Binary to build (reth or op-reth)
ARG BINARY=reth
# Manifest path for the binary
ARG MANIFEST_PATH=bin/reth
# Build profile, release by default
ARG BUILD_PROFILE=release
ENV BUILD_PROFILE=$BUILD_PROFILE
# Extra Cargo flags
ARG RUSTFLAGS=""
ENV RUSTFLAGS="$RUSTFLAGS"
# Extra Cargo features
ARG FEATURES=""
ENV FEATURES=$FEATURES
# Git info for vergen (since .git is excluded from Docker context)
ARG VERGEN_GIT_SHA=""
ARG VERGEN_GIT_DESCRIBE=""
ARG VERGEN_GIT_DIRTY="false"
ENV VERGEN_GIT_SHA=$VERGEN_GIT_SHA
ENV VERGEN_GIT_DESCRIBE=$VERGEN_GIT_DESCRIBE
ENV VERGEN_GIT_DIRTY=$VERGEN_GIT_DIRTY
# Build dependencies
RUN --mount=type=secret,id=DEPOT_TOKEN,env=SCCACHE_WEBDAV_TOKEN \
--mount=type=cache,target=/usr/local/cargo/registry,sharing=shared \
--mount=type=cache,target=/usr/local/cargo/git,sharing=shared \
--mount=type=cache,target=$SCCACHE_DIR,sharing=shared \
cargo chef cook --profile $BUILD_PROFILE --features "$FEATURES" --locked --recipe-path recipe.json --manifest-path $MANIFEST_PATH/Cargo.toml
# Build application
COPY --exclude=.git . .
RUN --mount=type=secret,id=DEPOT_TOKEN,env=SCCACHE_WEBDAV_TOKEN \
--mount=type=cache,target=/usr/local/cargo/registry,sharing=shared \
--mount=type=cache,target=/usr/local/cargo/git,sharing=shared \
--mount=type=cache,target=$SCCACHE_DIR,sharing=shared \
cargo build --profile $BUILD_PROFILE --features "$FEATURES" --locked --bin $BINARY --manifest-path $MANIFEST_PATH/Cargo.toml
RUN sccache --show-stats || true
# Copy binary to a known location (ARG not resolved in COPY)
# Note: Custom profiles like maxperf/profiling output to target/<profile>/, not target/release/
RUN cp /app/target/$BUILD_PROFILE/$BINARY /app/binary || \
cp /app/target/release/$BINARY /app/binary
FROM ubuntu:24.04 AS runtime
WORKDIR /app
# Binary name for entrypoint
ARG BINARY=reth
# Install runtime dependencies
RUN apt-get update && \
apt-get install -y --no-install-recommends ca-certificates && \
rm -rf /var/lib/apt/lists/*
# Copy binary from build stage and create canonical symlink for entrypoint
COPY --from=builder /app/binary /usr/local/bin/
RUN mv /usr/local/bin/binary /usr/local/bin/$BINARY && \
ln -s /usr/local/bin/$BINARY /usr/local/bin/reth-binary && \
chmod +x /usr/local/bin/$BINARY
# Copy licenses
COPY LICENSE-* ./
EXPOSE 30303 30303/udp 9001 8545 8546
ENTRYPOINT ["/usr/local/bin/reth-binary"]

15
DockerfileOp.cross Normal file
View File

@@ -0,0 +1,15 @@
# This image is meant to enable cross-architecture builds.
# It assumes the reth binary has already been compiled for `$TARGETPLATFORM` and is
# locatable in `./dist/bin/$TARGETARCH`
FROM --platform=$TARGETPLATFORM ubuntu:22.04
LABEL org.opencontainers.image.source=https://github.com/paradigmxyz/reth
LABEL org.opencontainers.image.licenses="MIT OR Apache-2.0"
# Filled by docker buildx
ARG TARGETARCH
COPY ./dist/bin/$TARGETARCH/op-reth /usr/local/bin/op-reth
EXPOSE 30303 30303/udp 9001 8545 8546
ENTRYPOINT ["/usr/local/bin/op-reth"]

134
Makefile
View File

@@ -35,6 +35,9 @@ EEST_TESTS_TAG := v4.5.0
EEST_TESTS_URL := https://github.com/ethereum/execution-spec-tests/releases/download/$(EEST_TESTS_TAG)/fixtures_stable.tar.gz
EEST_TESTS_DIR := ./testing/ef-tests/execution-spec-tests
# The docker image name
DOCKER_IMAGE_NAME ?= ghcr.io/paradigmxyz/reth
##@ Help
.PHONY: help
@@ -239,6 +242,137 @@ install-reth-bench: ## Build and install the reth binary under `$(CARGO_HOME)/bi
--features "$(FEATURES)" \
--profile "$(PROFILE)"
##@ Docker
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --driver docker-container --name cross-builder`
.PHONY: docker-build-push
docker-build-push: ## Build and push a cross-arch Docker image tagged with the latest git tag.
$(call docker_build_push,$(GIT_TAG),$(GIT_TAG))
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --driver docker-container --name cross-builder`
.PHONY: docker-build-push-git-sha
docker-build-push-git-sha: ## Build and push a cross-arch Docker image tagged with the latest git sha.
$(call docker_build_push,$(GIT_SHA),$(GIT_SHA))
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --driver docker-container --name cross-builder`
.PHONY: docker-build-push-latest
docker-build-push-latest: ## Build and push a cross-arch Docker image tagged with the latest git tag and `latest`.
$(call docker_build_push,$(GIT_TAG),latest)
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --name cross-builder`
.PHONY: docker-build-push-nightly
docker-build-push-nightly: ## Build and push cross-arch Docker image tagged with the latest git tag with a `-nightly` suffix, and `latest-nightly`.
$(call docker_build_push,nightly,nightly)
.PHONY: docker-build-push-nightly-edge-profiling
docker-build-push-nightly-edge-profiling: FEATURES := $(FEATURES) edge
docker-build-push-nightly-edge-profiling: ## Build and push cross-arch Docker image with edge features tagged with `nightly-edge-profiling`.
$(call docker_build_push,nightly-edge-profiling,nightly-edge-profiling)
# Create a cross-arch Docker image with the given tags and push it
define docker_build_push
$(MAKE) FEATURES="$(FEATURES)" build-x86_64-unknown-linux-gnu
mkdir -p $(BIN_DIR)/amd64
cp $(CARGO_TARGET_DIR)/x86_64-unknown-linux-gnu/$(PROFILE)/reth $(BIN_DIR)/amd64/reth
$(MAKE) FEATURES="$(FEATURES)" build-aarch64-unknown-linux-gnu
mkdir -p $(BIN_DIR)/arm64
cp $(CARGO_TARGET_DIR)/aarch64-unknown-linux-gnu/$(PROFILE)/reth $(BIN_DIR)/arm64/reth
docker buildx build --file ./Dockerfile.cross . \
--platform linux/amd64,linux/arm64 \
--tag $(DOCKER_IMAGE_NAME):$(1) \
--tag $(DOCKER_IMAGE_NAME):$(2) \
--provenance=false \
--push
endef
##@ Optimism docker
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --driver docker-container --name cross-builder`
.PHONY: op-docker-build-push
op-docker-build-push: ## Build and push a cross-arch Docker image tagged with the latest git tag.
$(call op_docker_build_push,$(GIT_TAG),$(GIT_TAG))
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --driver docker-container --name cross-builder`
.PHONY: op-docker-build-push-git-sha
op-docker-build-push-git-sha: ## Build and push a cross-arch Docker image tagged with the latest git sha.
$(call op_docker_build_push,$(GIT_SHA),$(GIT_SHA))
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --driver docker-container --name cross-builder`
.PHONY: op-docker-build-push-latest
op-docker-build-push-latest: ## Build and push a cross-arch Docker image tagged with the latest git tag and `latest`.
$(call op_docker_build_push,$(GIT_TAG),latest)
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --name cross-builder`
.PHONY: op-docker-build-push-nightly
op-docker-build-push-nightly: ## Build and push cross-arch Docker image tagged with the latest git tag with a `-nightly` suffix, and `latest-nightly`.
$(call op_docker_build_push,nightly,nightly)
.PHONY: op-docker-build-push-nightly-edge-profiling
op-docker-build-push-nightly-edge-profiling: FEATURES := $(FEATURES) edge
op-docker-build-push-nightly-edge-profiling: ## Build and push cross-arch Docker image with edge features tagged with `nightly-edge-profiling`.
$(call op_docker_build_push,nightly-edge-profiling,nightly-edge-profiling)
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --name cross-builder`
.PHONY: docker-build-push-nightly-profiling
docker-build-push-nightly-profiling: ## Build and push cross-arch Docker image with profiling profile tagged with nightly-profiling.
$(call docker_build_push,nightly-profiling,nightly-profiling)
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --name cross-builder`
.PHONY: op-docker-build-push-nightly-profiling
op-docker-build-push-nightly-profiling: ## Build and push cross-arch Docker image tagged with the latest git tag with a `-nightly` suffix, and `latest-nightly`.
$(call op_docker_build_push,nightly-profiling,nightly-profiling)
# Create a cross-arch Docker image with the given tags and push it
define op_docker_build_push
$(MAKE) FEATURES="$(FEATURES)" op-build-x86_64-unknown-linux-gnu
mkdir -p $(BIN_DIR)/amd64
cp $(CARGO_TARGET_DIR)/x86_64-unknown-linux-gnu/$(PROFILE)/op-reth $(BIN_DIR)/amd64/op-reth
$(MAKE) FEATURES="$(FEATURES)" op-build-aarch64-unknown-linux-gnu
mkdir -p $(BIN_DIR)/arm64
cp $(CARGO_TARGET_DIR)/aarch64-unknown-linux-gnu/$(PROFILE)/op-reth $(BIN_DIR)/arm64/op-reth
docker buildx build --file ./DockerfileOp.cross . \
--platform linux/amd64,linux/arm64 \
--tag $(DOCKER_IMAGE_NAME):$(1) \
--tag $(DOCKER_IMAGE_NAME):$(2) \
--provenance=false \
--push
endef
##@ Other
.PHONY: clean

View File

@@ -3,7 +3,7 @@
use crate::{
authenticated_transport::AuthenticatedTransportConnect,
bench::{
helpers::{build_payload, parse_gas_limit, prepare_payload_request, rpc_block_to_header},
helpers::{build_payload, prepare_payload_request, rpc_block_to_header},
output::GasRampPayloadFile,
},
valid_payload::{call_forkchoice_updated, call_new_payload, payload_to_new_payload},
@@ -22,6 +22,29 @@ use reth_primitives_traits::constants::{GAS_LIMIT_BOUND_DIVISOR, MAXIMUM_GAS_LIM
use std::{path::PathBuf, time::Instant};
use tracing::info;
/// Parses a gas limit value with optional suffix: K for thousand, M for million, G for billion.
///
/// Examples: "30000000", "30M", "1G", "2G"
fn parse_gas_limit(s: &str) -> eyre::Result<u64> {
let s = s.trim();
if s.is_empty() {
return Err(eyre::eyre!("empty value"));
}
let (num_str, multiplier) = if let Some(prefix) = s.strip_suffix(['G', 'g']) {
(prefix, 1_000_000_000u64)
} else if let Some(prefix) = s.strip_suffix(['M', 'm']) {
(prefix, 1_000_000u64)
} else if let Some(prefix) = s.strip_suffix(['K', 'k']) {
(prefix, 1_000u64)
} else {
(s, 1u64)
};
let base: u64 = num_str.trim().parse()?;
base.checked_mul(multiplier).ok_or_else(|| eyre::eyre!("value overflow"))
}
/// `reth benchmark gas-limit-ramp` command.
#[derive(Debug, Parser)]
pub struct Command {
@@ -214,3 +237,50 @@ const fn should_stop(mode: RampMode, blocks_processed: u64, current_gas_limit: u
RampMode::TargetGasLimit(target) => current_gas_limit >= target,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_gas_limit_plain_number() {
assert_eq!(parse_gas_limit("30000000").unwrap(), 30_000_000);
assert_eq!(parse_gas_limit("1").unwrap(), 1);
assert_eq!(parse_gas_limit("0").unwrap(), 0);
}
#[test]
fn test_parse_gas_limit_k_suffix() {
assert_eq!(parse_gas_limit("1K").unwrap(), 1_000);
assert_eq!(parse_gas_limit("30k").unwrap(), 30_000);
assert_eq!(parse_gas_limit("100K").unwrap(), 100_000);
}
#[test]
fn test_parse_gas_limit_m_suffix() {
assert_eq!(parse_gas_limit("1M").unwrap(), 1_000_000);
assert_eq!(parse_gas_limit("30m").unwrap(), 30_000_000);
assert_eq!(parse_gas_limit("100M").unwrap(), 100_000_000);
}
#[test]
fn test_parse_gas_limit_g_suffix() {
assert_eq!(parse_gas_limit("1G").unwrap(), 1_000_000_000);
assert_eq!(parse_gas_limit("2g").unwrap(), 2_000_000_000);
assert_eq!(parse_gas_limit("10G").unwrap(), 10_000_000_000);
}
#[test]
fn test_parse_gas_limit_with_whitespace() {
assert_eq!(parse_gas_limit(" 1G ").unwrap(), 1_000_000_000);
assert_eq!(parse_gas_limit("2 M").unwrap(), 2_000_000);
}
#[test]
fn test_parse_gas_limit_errors() {
assert!(parse_gas_limit("").is_err());
assert!(parse_gas_limit("abc").is_err());
assert!(parse_gas_limit("G").is_err());
assert!(parse_gas_limit("-1G").is_err());
}
}

View File

@@ -3,9 +3,7 @@
//! This command fetches transactions from existing blocks and packs them into a single
//! large block using the `testing_buildBlockV1` RPC endpoint.
use crate::{
authenticated_transport::AuthenticatedTransportConnect, bench::helpers::parse_gas_limit,
};
use crate::authenticated_transport::AuthenticatedTransportConnect;
use alloy_eips::{BlockNumberOrTag, Typed2718};
use alloy_primitives::{Bytes, B256};
use alloy_provider::{ext::EngineApi, network::AnyNetwork, Provider, RootProvider};
@@ -204,26 +202,13 @@ pub struct Command {
jwt_secret: std::path::PathBuf,
/// Target gas to pack into the block.
/// Accepts short notation: K for thousand, M for million, G for billion (e.g., 1G = 1
/// billion).
#[arg(long, value_name = "TARGET_GAS", default_value = "30000000", value_parser = parse_gas_limit)]
#[arg(long, value_name = "TARGET_GAS", default_value = "30000000")]
target_gas: u64,
/// Block number to start fetching transactions from (required).
///
/// This must be the last canonical block BEFORE any gas limit ramping was performed.
/// The command collects transactions from historical blocks starting at this number
/// to pack into large blocks.
///
/// How to determine this value:
/// - If starting from a fresh node (no gas limit ramp yet): use the current chain tip
/// - If gas limit ramping has already been performed: use the block number that was the chain
/// tip BEFORE ramping began (you must track this yourself)
///
/// Using a block after ramping started will cause transaction collection to fail
/// because those blocks contain synthetic transactions that cannot be replayed.
/// Starting block number to fetch transactions from.
/// If not specified, starts from the engine's latest block.
#[arg(long, value_name = "FROM_BLOCK")]
from_block: u64,
from_block: Option<u64>,
/// Execute the payload (call newPayload + forkchoiceUpdated).
/// If false, only builds the payload and prints it.
@@ -299,7 +284,7 @@ impl Command {
format!("Failed to create output directory: {:?}", self.output_dir)
})?;
let start_block = self.from_block;
let start_block = self.from_block.unwrap_or(parent_number);
// Use pipelined execution when generating multiple payloads
if self.count > 1 {

View File

@@ -1,29 +1,6 @@
//! Common helpers for reth-bench commands.
use crate::valid_payload::call_forkchoice_updated;
/// Parses a gas limit value with optional suffix: K for thousand, M for million, G for billion.
///
/// Examples: "30000000", "30M", "1G", "2G"
pub(crate) fn parse_gas_limit(s: &str) -> eyre::Result<u64> {
let s = s.trim();
if s.is_empty() {
return Err(eyre::eyre!("empty value"));
}
let (num_str, multiplier) = if let Some(prefix) = s.strip_suffix(['G', 'g']) {
(prefix, 1_000_000_000u64)
} else if let Some(prefix) = s.strip_suffix(['M', 'm']) {
(prefix, 1_000_000u64)
} else if let Some(prefix) = s.strip_suffix(['K', 'k']) {
(prefix, 1_000u64)
} else {
(s, 1u64)
};
let base: u64 = num_str.trim().parse()?;
base.checked_mul(multiplier).ok_or_else(|| eyre::eyre!("value overflow"))
}
use alloy_consensus::Header;
use alloy_eips::eip4844::kzg_to_versioned_hash;
use alloy_primitives::{Address, B256};
@@ -217,50 +194,3 @@ pub(crate) async fn get_payload_with_sidecar(
_ => panic!("This tool does not support getPayload versions past v5"),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_gas_limit_plain_number() {
assert_eq!(parse_gas_limit("30000000").unwrap(), 30_000_000);
assert_eq!(parse_gas_limit("1").unwrap(), 1);
assert_eq!(parse_gas_limit("0").unwrap(), 0);
}
#[test]
fn test_parse_gas_limit_k_suffix() {
assert_eq!(parse_gas_limit("1K").unwrap(), 1_000);
assert_eq!(parse_gas_limit("30k").unwrap(), 30_000);
assert_eq!(parse_gas_limit("100K").unwrap(), 100_000);
}
#[test]
fn test_parse_gas_limit_m_suffix() {
assert_eq!(parse_gas_limit("1M").unwrap(), 1_000_000);
assert_eq!(parse_gas_limit("30m").unwrap(), 30_000_000);
assert_eq!(parse_gas_limit("100M").unwrap(), 100_000_000);
}
#[test]
fn test_parse_gas_limit_g_suffix() {
assert_eq!(parse_gas_limit("1G").unwrap(), 1_000_000_000);
assert_eq!(parse_gas_limit("2g").unwrap(), 2_000_000_000);
assert_eq!(parse_gas_limit("10G").unwrap(), 10_000_000_000);
}
#[test]
fn test_parse_gas_limit_with_whitespace() {
assert_eq!(parse_gas_limit(" 1G ").unwrap(), 1_000_000_000);
assert_eq!(parse_gas_limit("2 M").unwrap(), 2_000_000);
}
#[test]
fn test_parse_gas_limit_errors() {
assert!(parse_gas_limit("").is_err());
assert!(parse_gas_limit("abc").is_err());
assert!(parse_gas_limit("G").is_err());
assert!(parse_gas_limit("-1G").is_err());
}
}

View File

@@ -25,8 +25,8 @@
//! - `jemalloc-unprefixed`: Uses unprefixed jemalloc symbols.
//! - `tracy-allocator`: Enables [Tracy](https://github.com/wolfpld/tracy) profiler allocator
//! integration for memory profiling.
//! - `snmalloc`: Uses [snmalloc](https://github.com/microsoft/snmalloc) as the global allocator.
//! Use `--no-default-features` when enabling this, as jemalloc takes precedence.
//! - `snmalloc`: Uses [snmalloc](https://github.com/snmalloc/snmalloc) as the global allocator. Use
//! `--no-default-features` when enabling this, as jemalloc takes precedence.
//! - `snmalloc-native`: Uses snmalloc with native CPU optimizations. Use `--no-default-features`
//! when enabling this.
//!

View File

@@ -206,33 +206,11 @@ impl DeferredTrieData {
Default::default(), // prefix_sets are per-block, not cumulative
);
// Only trigger COW clone if there's actually data to add.
#[cfg(feature = "rayon")]
{
rayon::join(
|| {
if !sorted_hashed_state.is_empty() {
Arc::make_mut(&mut overlay.state)
.extend_ref_and_sort(&sorted_hashed_state);
}
},
|| {
if !sorted_trie_updates.is_empty() {
Arc::make_mut(&mut overlay.nodes)
.extend_ref_and_sort(&sorted_trie_updates);
}
},
);
if !sorted_hashed_state.is_empty() {
Arc::make_mut(&mut overlay.state).extend_ref_and_sort(&sorted_hashed_state);
}
#[cfg(not(feature = "rayon"))]
{
if !sorted_hashed_state.is_empty() {
Arc::make_mut(&mut overlay.state)
.extend_ref_and_sort(&sorted_hashed_state);
}
if !sorted_trie_updates.is_empty() {
Arc::make_mut(&mut overlay.nodes)
.extend_ref_and_sort(&sorted_trie_updates);
}
if !sorted_trie_updates.is_empty() {
Arc::make_mut(&mut overlay.nodes).extend_ref_and_sort(&sorted_trie_updates);
}
overlay
}
@@ -265,8 +243,53 @@ impl DeferredTrieData {
/// In normal operation, the parent always has a cached overlay and this
/// function is never called.
///
/// Iterates ancestors oldest -> newest, then extends with current block's data,
/// so later state takes precedence.
/// When the `rayon` feature is enabled, uses parallel collection and merge:
/// 1. Collects ancestor data in parallel (each `wait_cloned()` may compute)
/// 2. Merges hashed state and trie updates in parallel with each other
/// 3. Uses tree reduction within each merge for O(log n) depth
#[cfg(feature = "rayon")]
fn merge_ancestors_into_overlay(
ancestors: &[Self],
sorted_hashed_state: &HashedPostStateSorted,
sorted_trie_updates: &TrieUpdatesSorted,
) -> TrieInputSorted {
// Early exit: no ancestors means just wrap current block's data
if ancestors.is_empty() {
return TrieInputSorted::new(
Arc::new(sorted_trie_updates.clone()),
Arc::new(sorted_hashed_state.clone()),
Default::default(),
);
}
// Collect ancestor data, unzipping states and updates into Arc slices
let (states, updates): (Vec<_>, Vec<_>) = ancestors
.iter()
.map(|a| {
let data = a.wait_cloned();
(data.hashed_state, data.trie_updates)
})
.unzip();
// Merge state and nodes in parallel with each other using tree reduction
let (state, nodes) = rayon::join(
|| {
let mut merged = HashedPostStateSorted::merge_parallel(&states);
merged.extend_ref_and_sort(sorted_hashed_state);
merged
},
|| {
let mut merged = TrieUpdatesSorted::merge_parallel(&updates);
merged.extend_ref_and_sort(sorted_trie_updates);
merged
},
);
TrieInputSorted::new(Arc::new(nodes), Arc::new(state), Default::default())
}
/// Merge all ancestors and current block's data into a single overlay (sequential fallback).
#[cfg(not(feature = "rayon"))]
fn merge_ancestors_into_overlay(
ancestors: &[Self],
sorted_hashed_state: &HashedPostStateSorted,
@@ -284,17 +307,8 @@ impl DeferredTrieData {
}
// Extend with current block's sorted data last (takes precedence)
#[cfg(feature = "rayon")]
rayon::join(
|| state_mut.extend_ref_and_sort(sorted_hashed_state),
|| nodes_mut.extend_ref_and_sort(sorted_trie_updates),
);
#[cfg(not(feature = "rayon"))]
{
state_mut.extend_ref_and_sort(sorted_hashed_state);
nodes_mut.extend_ref_and_sort(sorted_trie_updates);
}
state_mut.extend_ref_and_sort(sorted_hashed_state);
nodes_mut.extend_ref_and_sort(sorted_trie_updates);
overlay
}

View File

@@ -17,10 +17,7 @@ use reth_primitives_traits::{
SignedTransaction,
};
use reth_storage_api::StateProviderBox;
use reth_trie::{
updates::TrieUpdatesSorted, HashedPostStateSorted, LazyTrieData, SortedTrieData,
TrieInputSorted,
};
use reth_trie::{updates::TrieUpdatesSorted, HashedPostStateSorted, LazyTrieData, TrieInputSorted};
use std::{collections::BTreeMap, sync::Arc, time::Instant};
use tokio::sync::{broadcast, watch};
@@ -951,36 +948,22 @@ impl<N: NodePrimitives<SignedTx: SignedTransaction>> NewCanonicalChain<N> {
match blocks {
[] => Chain::default(),
[first, rest @ ..] => {
let trie_data_handle = first.trie_data_handle();
let mut chain = Chain::from_block(
first.recovered_block().clone(),
ExecutionOutcome::from((
first.execution_outcome().clone(),
first.block_number(),
)),
LazyTrieData::deferred(move || {
let trie_data = trie_data_handle.wait_cloned();
SortedTrieData {
hashed_state: trie_data.hashed_state,
trie_updates: trie_data.trie_updates,
}
}),
LazyTrieData::ready(first.hashed_state(), first.trie_updates()),
);
for exec in rest {
let trie_data_handle = exec.trie_data_handle();
chain.append_block(
exec.recovered_block().clone(),
ExecutionOutcome::from((
exec.execution_outcome().clone(),
exec.block_number(),
)),
LazyTrieData::deferred(move || {
let trie_data = trie_data_handle.wait_cloned();
SortedTrieData {
hashed_state: trie_data.hashed_state,
trie_updates: trie_data.trie_updates,
}
}),
LazyTrieData::ready(exec.hashed_state(), exec.trie_updates()),
);
}
chain

View File

@@ -25,7 +25,6 @@ pub use alloy_chains::{Chain, ChainKind, NamedChain};
/// Re-export for convenience
pub use reth_ethereum_forks::*;
pub use alloy_evm::EvmLimitParams;
pub use api::EthChainSpec;
pub use info::ChainInfo;
#[cfg(any(test, feature = "test-utils"))]

View File

@@ -205,16 +205,6 @@ impl Command {
.add_cell(Cell::new(human_bytes(total_size as f64)))
.add_cell(Cell::new(human_bytes(total_pending as f64)));
table.add_row(row);
let wal_size = tool.provider_factory.rocksdb_provider().wal_size_bytes();
let mut row = Row::new();
row.add_cell(Cell::new("WAL"))
.add_cell(Cell::new(""))
.add_cell(Cell::new(""))
.add_cell(Cell::new(""))
.add_cell(Cell::new(human_bytes(wal_size as f64)))
.add_cell(Cell::new(""));
table.add_row(row);
}
table

View File

@@ -26,14 +26,6 @@ pub struct ImportCommand<C: ChainSpecParser> {
#[arg(long, value_name = "CHUNK_LEN", verbatim_doc_comment)]
chunk_len: Option<u64>,
/// Fail immediately when an invalid block is encountered.
///
/// By default, the import will stop at the last valid block if an invalid block is
/// encountered during execution or validation, leaving the database at the last valid
/// block state. When this flag is set, the import will instead fail with an error.
#[arg(long, verbatim_doc_comment)]
fail_on_invalid_block: bool,
/// The path(s) to block file(s) for import.
///
/// The online stages (headers and bodies) are replaced by a file import, after which the
@@ -60,11 +52,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportComm
info!(target: "reth::cli", "Starting import of {} file(s)", self.paths.len());
let import_config = ImportConfig {
no_state: self.no_state,
chunk_len: self.chunk_len,
fail_on_invalid_block: self.fail_on_invalid_block,
};
let import_config = ImportConfig { no_state: self.no_state, chunk_len: self.chunk_len };
let executor = components.evm_config().clone();
let consensus = Arc::new(components.consensus().clone());
@@ -93,20 +81,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportComm
total_decoded_blocks += result.total_decoded_blocks;
total_decoded_txns += result.total_decoded_txns;
// Check if we stopped due to an invalid block
if result.stopped_on_invalid_block {
info!(target: "reth::cli",
"Stopped at last valid block {} due to invalid block {} in file: {}. Imported {} blocks, {} transactions",
result.last_valid_block.unwrap_or(0),
result.bad_block.unwrap_or(0),
path.display(),
result.total_imported_blocks,
result.total_imported_txns);
// Stop importing further files and exit successfully
break;
}
if !result.is_successful() {
if !result.is_complete() {
return Err(eyre::eyre!(
"Chain was partially imported from file: {}. Imported {}/{} blocks, {}/{} transactions",
path.display(),
@@ -123,7 +98,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportComm
}
info!(target: "reth::cli",
"Import complete. Total: {}/{} blocks, {}/{} transactions",
"All files imported successfully. Total: {}/{} blocks, {}/{} transactions",
total_imported_blocks, total_decoded_blocks, total_imported_txns, total_decoded_txns);
Ok(())
@@ -164,20 +139,4 @@ mod tests {
assert_eq!(args.paths[1], PathBuf::from("file2.rlp"));
assert_eq!(args.paths[2], PathBuf::from("file3.rlp"));
}
#[test]
fn parse_import_command_with_fail_on_invalid_block() {
let args: ImportCommand<EthereumChainSpecParser> =
ImportCommand::parse_from(["reth", "--fail-on-invalid-block", "chain.rlp"]);
assert!(args.fail_on_invalid_block);
assert_eq!(args.paths.len(), 1);
assert_eq!(args.paths[0], PathBuf::from("chain.rlp"));
}
#[test]
fn parse_import_command_default_stops_on_invalid_block() {
let args: ImportCommand<EthereumChainSpecParser> =
ImportCommand::parse_from(["reth", "chain.rlp"]);
assert!(!args.fail_on_invalid_block);
}
}

View File

@@ -22,11 +22,11 @@ use reth_provider::{
StageCheckpointReader,
};
use reth_prune::PruneModes;
use reth_stages::{prelude::*, ControlFlow, Pipeline, StageId, StageSet};
use reth_stages::{prelude::*, Pipeline, StageId, StageSet};
use reth_static_file::StaticFileProducer;
use std::{path::Path, sync::Arc};
use tokio::sync::watch;
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info};
/// Configuration for importing blocks from RLP files.
#[derive(Debug, Clone, Default)]
@@ -35,9 +35,6 @@ pub struct ImportConfig {
pub no_state: bool,
/// Chunk byte length to read from file.
pub chunk_len: Option<u64>,
/// If true, fail immediately when an invalid block is encountered.
/// By default (false), the import stops at the last valid block and exits successfully.
pub fail_on_invalid_block: bool,
}
/// Result of an import operation.
@@ -51,12 +48,6 @@ pub struct ImportResult {
pub total_imported_blocks: usize,
/// Total number of transactions imported into the database.
pub total_imported_txns: usize,
/// Whether the import was stopped due to an invalid block.
pub stopped_on_invalid_block: bool,
/// The block number that was invalid, if any.
pub bad_block: Option<u64>,
/// The last valid block number when stopped due to invalid block.
pub last_valid_block: Option<u64>,
}
impl ImportResult {
@@ -65,14 +56,6 @@ impl ImportResult {
self.total_decoded_blocks == self.total_imported_blocks &&
self.total_decoded_txns == self.total_imported_txns
}
/// Returns true if the import was successful, considering stop-on-invalid-block mode.
///
/// In stop-on-invalid-block mode, a partial import is considered successful if we
/// stopped due to an invalid block (leaving the DB at the last valid block).
pub fn is_successful(&self) -> bool {
self.is_complete() || self.stopped_on_invalid_block
}
}
/// Imports blocks from an RLP-encoded file into the database.
@@ -120,11 +103,6 @@ where
let static_file_producer =
StaticFileProducer::new(provider_factory.clone(), PruneModes::default());
// Track if we stopped due to an invalid block
let mut stopped_on_invalid_block = false;
let mut bad_block_number: Option<u64> = None;
let mut last_valid_block_number: Option<u64> = None;
while let Some(file_client) =
reader.next_chunk::<BlockTy<N>>(consensus.clone(), Some(sealed_header)).await?
{
@@ -159,51 +137,12 @@ where
// Run pipeline
info!(target: "reth::import", "Starting sync pipeline");
if import_config.fail_on_invalid_block {
// Original behavior: fail on unwind
tokio::select! {
res = pipeline.run() => res?,
_ = tokio::signal::ctrl_c() => {
info!(target: "reth::import", "Import interrupted by user");
break;
},
}
} else {
// Default behavior: Use run_loop() to handle unwinds gracefully
let result = tokio::select! {
res = pipeline.run_loop() => res,
_ = tokio::signal::ctrl_c() => {
info!(target: "reth::import", "Import interrupted by user");
break;
},
};
match result {
Ok(ControlFlow::Unwind { target, bad_block }) => {
// An invalid block was encountered; stop at last valid block
let bad = bad_block.block.number;
warn!(
target: "reth::import",
bad_block = bad,
last_valid_block = target,
"Invalid block encountered during import; stopping at last valid block"
);
stopped_on_invalid_block = true;
bad_block_number = Some(bad);
last_valid_block_number = Some(target);
break;
}
Ok(ControlFlow::Continue { block_number }) => {
debug!(target: "reth::import", block_number, "Pipeline chunk completed");
}
Ok(ControlFlow::NoProgress { block_number }) => {
debug!(target: "reth::import", ?block_number, "Pipeline made no progress");
}
Err(e) => {
// Propagate other pipeline errors
return Err(e.into());
}
}
tokio::select! {
res = pipeline.run() => res?,
_ = tokio::signal::ctrl_c() => {
info!(target: "reth::import", "Import interrupted by user");
break;
},
}
sealed_header = provider_factory
@@ -221,20 +160,9 @@ where
total_decoded_txns,
total_imported_blocks,
total_imported_txns,
stopped_on_invalid_block,
bad_block: bad_block_number,
last_valid_block: last_valid_block_number,
};
if result.stopped_on_invalid_block {
info!(target: "reth::import",
total_imported_blocks,
total_imported_txns,
bad_block = ?result.bad_block,
last_valid_block = ?result.last_valid_block,
"Import stopped at last valid block due to invalid block"
);
} else if !result.is_complete() {
if !result.is_complete() {
error!(target: "reth::import",
total_decoded_blocks,
total_imported_blocks,

View File

@@ -1,34 +0,0 @@
//! Enode identifier command
use clap::Parser;
use reth_cli_util::get_secret_key;
use reth_network_peers::NodeRecord;
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf,
};
/// Print the enode identifier for a given secret key.
#[derive(Parser, Debug)]
pub struct Command {
/// Path to the secret key file for discovery.
pub discovery_secret: PathBuf,
/// Optional IP address to include in the enode URL.
///
/// If not provided, defaults to 0.0.0.0.
#[arg(long)]
pub ip: Option<IpAddr>,
}
impl Command {
/// Execute the enode command.
pub fn execute(self) -> eyre::Result<()> {
let sk = get_secret_key(&self.discovery_secret)?;
let ip = self.ip.unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED));
let addr = SocketAddr::new(ip, 30303);
let enr = NodeRecord::from_secret_key(addr, &sk);
println!("{enr}");
Ok(())
}
}

View File

@@ -18,7 +18,6 @@ use reth_node_core::{
};
pub mod bootnode;
pub mod enode;
pub mod rlpx;
/// `reth p2p` command
@@ -86,9 +85,6 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
Subcommands::Bootnode(command) => {
command.execute().await?;
}
Subcommands::Enode(command) => {
command.execute()?;
}
}
Ok(())
@@ -103,7 +99,6 @@ impl<C: ChainSpecParser> Command<C> {
Subcommands::Body { args, .. } => Some(&args.chain),
Subcommands::Rlpx(_) => None,
Subcommands::Bootnode(_) => None,
Subcommands::Enode(_) => None,
}
}
}
@@ -131,8 +126,6 @@ pub enum Subcommands<C: ChainSpecParser> {
Rlpx(rlpx::Command),
/// Bootnode command
Bootnode(bootnode::Command),
/// Print enode identifier
Enode(enode::Command),
}
#[derive(Debug, Clone, Parser)]
@@ -232,16 +225,4 @@ mod tests {
let _args: Command<EthereumChainSpecParser> =
Command::parse_from(["reth", "body", "--chain", "mainnet", "1000"]);
}
#[test]
fn parse_enode_cmd() {
let _args: Command<EthereumChainSpecParser> =
Command::parse_from(["reth", "enode", "/tmp/secret"]);
}
#[test]
fn parse_enode_cmd_with_ip() {
let _args: Command<EthereumChainSpecParser> =
Command::parse_from(["reth", "enode", "/tmp/secret", "--ip", "192.168.1.1"]);
}
}

View File

@@ -1,11 +1,14 @@
//! Collection of methods for block validation.
use alloy_consensus::{BlockHeader as _, EMPTY_OMMER_ROOT_HASH};
use alloy_consensus::{BlockHeader as _, Transaction, EMPTY_OMMER_ROOT_HASH};
use alloy_eips::{eip4844::DATA_GAS_PER_BLOB, eip7840::BlobParams};
use reth_chainspec::{EthChainSpec, EthereumHardfork, EthereumHardforks};
use reth_consensus::ConsensusError;
use reth_consensus::{ConsensusError, TxGasLimitTooHighErr};
use reth_primitives_traits::{
constants::{GAS_LIMIT_BOUND_DIVISOR, MAXIMUM_GAS_LIMIT_BLOCK, MINIMUM_GAS_LIMIT},
constants::{
GAS_LIMIT_BOUND_DIVISOR, MAXIMUM_GAS_LIMIT_BLOCK, MAX_TX_GAS_LIMIT_OSAKA, MINIMUM_GAS_LIMIT,
},
transaction::TxHashRef,
Block, BlockBody, BlockHeader, GotExpected, SealedBlock, SealedHeader,
};
@@ -143,7 +146,7 @@ pub fn validate_block_pre_execution<B, ChainSpec>(
) -> Result<(), ConsensusError>
where
B: Block,
ChainSpec: EthChainSpec + EthereumHardforks,
ChainSpec: EthereumHardforks,
{
post_merge_hardfork_fields(block, chain_spec)?;
@@ -151,6 +154,19 @@ where
if let Err(error) = block.ensure_transaction_root_valid() {
return Err(ConsensusError::BodyTransactionRootDiff(error.into()))
}
// EIP-7825 validation
if chain_spec.is_osaka_active_at_timestamp(block.timestamp()) {
for tx in block.body().transactions() {
if tx.gas_limit() > MAX_TX_GAS_LIMIT_OSAKA {
return Err(TxGasLimitTooHighErr {
tx_hash: *tx.tx_hash(),
gas_limit: tx.gas_limit(),
max_allowed: MAX_TX_GAS_LIMIT_OSAKA,
}
.into());
}
}
}
Ok(())
}

View File

@@ -72,11 +72,3 @@ derive_more.workspace = true
[[test]]
name = "e2e_testsuite"
path = "tests/e2e-testsuite/main.rs"
[[test]]
name = "rocksdb"
path = "tests/rocksdb/main.rs"
required-features = ["edge"]
[features]
edge = ["reth-node-core/edge", "reth-provider/rocksdb", "reth-cli-commands/edge"]

View File

@@ -103,10 +103,7 @@ where
N: NodeBuilderHelper,
{
E2ETestSetupBuilder::new(num_nodes, chain_spec, attributes_generator)
.with_tree_config_modifier(move |base| {
// Apply caller's tree_config but preserve the small cache size from base
tree_config.clone().with_cross_block_cache_size(base.cross_block_cache_size())
})
.with_tree_config_modifier(move |_| tree_config.clone())
.with_node_config_modifier(move |config| config.set_dev(is_dev))
.with_connect_nodes(connect_nodes)
.build()

View File

@@ -112,13 +112,11 @@ where
..NetworkArgs::default()
};
// Apply tree config modifier if present, with test-appropriate defaults
let base_tree_config =
reth_node_api::TreeConfig::default().with_cross_block_cache_size(1024 * 1024);
// Apply tree config modifier if present
let tree_config = if let Some(modifier) = self.tree_config_modifier {
modifier(base_tree_config)
modifier(reth_node_api::TreeConfig::default())
} else {
base_tree_config
reth_node_api::TreeConfig::default()
};
let mut nodes = (0..self.num_nodes)

View File

@@ -38,18 +38,6 @@ impl TransactionTestContext {
signed.encoded_2718().into()
}
/// Creates a transfer with a specific nonce and signs it, returning bytes.
/// Uses high `max_fee_per_gas` (1000 gwei) to ensure tx acceptance regardless of basefee.
pub async fn transfer_tx_bytes_with_nonce(
chain_id: u64,
wallet: PrivateKeySigner,
nonce: u64,
) -> Bytes {
let tx = tx(chain_id, 21000, None, None, nonce, Some(1000e9 as u128));
let signed = Self::sign_tx(wallet, tx).await;
signed.encoded_2718().into()
}
/// Creates a deployment transaction and signs it, returning an envelope.
pub async fn deploy_tx(
chain_id: u64,

View File

@@ -1,470 +0,0 @@
//! E2E tests for `RocksDB` provider functionality.
#![cfg(all(feature = "edge", unix))]
use alloy_consensus::BlockHeader;
use alloy_primitives::B256;
use alloy_rpc_types_eth::{Transaction, TransactionReceipt};
use eyre::Result;
use jsonrpsee::core::client::ClientT;
use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
use reth_db::tables;
use reth_e2e_test_utils::{transaction::TransactionTestContext, wallet, E2ETestSetupBuilder};
use reth_node_builder::NodeConfig;
use reth_node_core::args::RocksDbArgs;
use reth_node_ethereum::EthereumNode;
use reth_payload_builder::EthPayloadBuilderAttributes;
use reth_provider::RocksDBProviderFactory;
use std::{sync::Arc, time::Duration};
const ROCKSDB_POLL_TIMEOUT: Duration = Duration::from_secs(60);
const ROCKSDB_POLL_INTERVAL: Duration = Duration::from_millis(50);
/// Polls RPC until the given `tx_hash` is visible as pending (not yet mined).
/// Prevents race conditions where `advance_block` is called before txs are in the pool.
/// Returns the pending transaction.
async fn wait_for_pending_tx<C: ClientT>(client: &C, tx_hash: B256) -> Transaction {
let start = std::time::Instant::now();
loop {
let tx: Option<Transaction> = client
.request("eth_getTransactionByHash", [tx_hash])
.await
.expect("RPC request failed");
if let Some(tx) = tx {
assert!(
tx.block_number.is_none(),
"Expected pending tx but tx_hash={tx_hash:?} is already mined in block {:?}",
tx.block_number
);
return tx;
}
assert!(
start.elapsed() < ROCKSDB_POLL_TIMEOUT,
"Timed out after {:?} waiting for tx_hash={tx_hash:?} to appear in pending pool",
start.elapsed()
);
tokio::time::sleep(ROCKSDB_POLL_INTERVAL).await;
}
}
/// Polls `RocksDB` until the given `tx_hash` appears in `TransactionHashNumbers`.
/// Returns the `tx_number` on success, or panics on timeout.
async fn poll_tx_in_rocksdb<P: RocksDBProviderFactory>(provider: &P, tx_hash: B256) -> u64 {
let start = std::time::Instant::now();
let mut interval = ROCKSDB_POLL_INTERVAL;
loop {
// Re-acquire handle each iteration to avoid stale snapshot reads
let rocksdb = provider.rocksdb_provider();
let tx_number: Option<u64> =
rocksdb.get::<tables::TransactionHashNumbers>(tx_hash).expect("RocksDB get failed");
if let Some(n) = tx_number {
return n;
}
assert!(
start.elapsed() < ROCKSDB_POLL_TIMEOUT,
"Timed out after {:?} waiting for tx_hash={tx_hash:?} in RocksDB",
start.elapsed()
);
tokio::time::sleep(interval).await;
// Simple backoff: 50ms -> 100ms -> 200ms (capped)
interval = std::cmp::min(interval * 2, Duration::from_millis(200));
}
}
/// Returns the test chain spec for `RocksDB` tests.
fn test_chain_spec() -> Arc<ChainSpec> {
Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(
serde_json::from_str(include_str!("../../src/testsuite/assets/genesis.json"))
.expect("failed to parse genesis.json"),
)
.cancun_activated()
.build(),
)
}
/// Returns test payload attributes for the given timestamp.
fn test_attributes_generator(timestamp: u64) -> EthPayloadBuilderAttributes {
let attributes = alloy_rpc_types_engine::PayloadAttributes {
timestamp,
prev_randao: B256::ZERO,
suggested_fee_recipient: alloy_primitives::Address::ZERO,
withdrawals: Some(vec![]),
parent_beacon_block_root: Some(B256::ZERO),
};
EthPayloadBuilderAttributes::new(B256::ZERO, attributes)
}
/// Enables `RocksDB` for `TransactionHashNumbers` table.
///
/// Note: Static file changesets are disabled because `persistence_threshold(0)` causes
/// a race where the static file writer expects sequential block numbers but receives
/// them out of order, resulting in `UnexpectedStaticFileBlockNumber` errors.
fn with_rocksdb_enabled<C>(mut config: NodeConfig<C>) -> NodeConfig<C> {
config.rocksdb = RocksDbArgs { tx_hash: true, ..Default::default() };
config.static_files.storage_changesets = false;
config.static_files.account_changesets = false;
config
}
/// Smoke test: node boots with `RocksDB` routing enabled.
#[tokio::test]
async fn test_rocksdb_node_startup() -> Result<()> {
reth_tracing::init_test_tracing();
let chain_spec = test_chain_spec();
let (nodes, _tasks, _wallet) =
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, test_attributes_generator)
.with_node_config_modifier(with_rocksdb_enabled)
.build()
.await?;
assert_eq!(nodes.len(), 1);
// Verify RocksDB provider is functional (can query without error)
let rocksdb = nodes[0].inner.provider.rocksdb_provider();
let missing_hash = B256::from([0xab; 32]);
let result: Option<u64> = rocksdb.get::<tables::TransactionHashNumbers>(missing_hash)?;
assert!(result.is_none(), "Missing hash should return None");
let genesis_hash = nodes[0].block_hash(0);
assert_ne!(genesis_hash, B256::ZERO);
Ok(())
}
/// Block mining works with `RocksDB` storage.
#[tokio::test]
async fn test_rocksdb_block_mining() -> Result<()> {
reth_tracing::init_test_tracing();
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _wallet) =
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, test_attributes_generator)
.with_node_config_modifier(with_rocksdb_enabled)
.build()
.await?;
assert_eq!(nodes.len(), 1);
let genesis_hash = nodes[0].block_hash(0);
assert_ne!(genesis_hash, B256::ZERO);
// Mine 3 blocks with transactions
let wallets = wallet::Wallet::new(1).with_chain_id(chain_id).wallet_gen();
let signer = wallets[0].clone();
let client = nodes[0].rpc_client().expect("RPC client should be available");
for i in 1..=3u64 {
let raw_tx =
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), i - 1)
.await;
let tx_hash = nodes[0].rpc.inject_tx(raw_tx).await?;
// Wait for tx to enter pending pool before mining
wait_for_pending_tx(&client, tx_hash).await;
let payload = nodes[0].advance_block().await?;
let block = payload.block();
assert_eq!(block.number(), i);
assert_ne!(block.hash(), B256::ZERO);
// Verify tx was actually included in the block
let receipt: Option<TransactionReceipt> =
client.request("eth_getTransactionReceipt", [tx_hash]).await?;
let receipt = receipt.expect("Receipt should exist after mining");
assert_eq!(receipt.block_number, Some(i), "Tx should be in block {i}");
}
// Verify all blocks are stored
for i in 0..=3 {
let block_hash = nodes[0].block_hash(i);
assert_ne!(block_hash, B256::ZERO);
}
Ok(())
}
/// Tx hash lookup exercises `TransactionHashNumbers` table.
#[tokio::test]
async fn test_rocksdb_transaction_queries() -> Result<()> {
reth_tracing::init_test_tracing();
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
)
.with_node_config_modifier(with_rocksdb_enabled)
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
.build()
.await?;
assert_eq!(nodes.len(), 1);
// Inject and mine a transaction
let wallets = wallet::Wallet::new(1).with_chain_id(chain_id).wallet_gen();
let signer = wallets[0].clone();
let client = nodes[0].rpc_client().expect("RPC client should be available");
let raw_tx = TransactionTestContext::transfer_tx_bytes(chain_id, signer).await;
let tx_hash = nodes[0].rpc.inject_tx(raw_tx).await?;
// Wait for tx to enter pending pool before mining
wait_for_pending_tx(&client, tx_hash).await;
let payload = nodes[0].advance_block().await?;
assert_eq!(payload.block().number(), 1);
// Query each transaction by hash
let tx: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash]).await?;
let tx = tx.expect("Transaction should be found");
assert_eq!(tx.block_number, Some(1));
let receipt: Option<TransactionReceipt> =
client.request("eth_getTransactionReceipt", [tx_hash]).await?;
let receipt = receipt.expect("Receipt should be found");
assert_eq!(receipt.block_number, Some(1));
assert!(receipt.status());
// Direct RocksDB assertion - poll with timeout since persistence is async
let tx_number = poll_tx_in_rocksdb(&nodes[0].inner.provider, tx_hash).await;
assert_eq!(tx_number, 0, "First tx should have TxNumber 0");
// Verify missing hash returns None
let missing_hash = B256::from([0xde; 32]);
let rocksdb = nodes[0].inner.provider.rocksdb_provider();
let missing_tx_number: Option<u64> =
rocksdb.get::<tables::TransactionHashNumbers>(missing_hash)?;
assert!(missing_tx_number.is_none());
let missing_tx: Option<Transaction> =
client.request("eth_getTransactionByHash", [missing_hash]).await?;
assert!(missing_tx.is_none(), "expected no transaction for missing hash");
let missing_receipt: Option<TransactionReceipt> =
client.request("eth_getTransactionReceipt", [missing_hash]).await?;
assert!(missing_receipt.is_none(), "expected no receipt for missing hash");
Ok(())
}
/// Multiple transactions in the same block are correctly persisted to `RocksDB`.
#[tokio::test]
async fn test_rocksdb_multi_tx_same_block() -> Result<()> {
reth_tracing::init_test_tracing();
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
)
.with_node_config_modifier(with_rocksdb_enabled)
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
.build()
.await?;
// Create 3 txs from the same wallet with sequential nonces
let wallets = wallet::Wallet::new(1).with_chain_id(chain_id).wallet_gen();
let signer = wallets[0].clone();
let client = nodes[0].rpc_client().expect("RPC client");
let mut tx_hashes = Vec::new();
for nonce in 0..3 {
let raw_tx =
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), nonce)
.await;
let tx_hash = nodes[0].rpc.inject_tx(raw_tx).await?;
tx_hashes.push(tx_hash);
}
// Wait for all txs to appear in pending pool before mining
for tx_hash in &tx_hashes {
wait_for_pending_tx(&client, *tx_hash).await;
}
// Mine one block containing all 3 txs
let payload = nodes[0].advance_block().await?;
assert_eq!(payload.block().number(), 1);
// Verify block contains all 3 txs
let block: Option<alloy_rpc_types_eth::Block> =
client.request("eth_getBlockByNumber", ("0x1", true)).await?;
let block = block.expect("Block 1 should exist");
assert_eq!(block.transactions.len(), 3, "Block should contain 3 txs");
// Verify each tx via RPC
for tx_hash in &tx_hashes {
let tx: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash]).await?;
let tx = tx.expect("Transaction should be found");
assert_eq!(tx.block_number, Some(1), "All txs should be in block 1");
}
// Poll RocksDB for all tx hashes and collect tx_numbers
let mut tx_numbers = Vec::new();
for tx_hash in &tx_hashes {
let n = poll_tx_in_rocksdb(&nodes[0].inner.provider, *tx_hash).await;
tx_numbers.push(n);
}
// Verify tx_numbers form the set {0, 1, 2}
tx_numbers.sort();
assert_eq!(tx_numbers, vec![0, 1, 2], "TxNumbers should be 0, 1, 2");
Ok(())
}
/// Transactions across multiple blocks have globally continuous `tx_numbers`.
#[tokio::test]
async fn test_rocksdb_txs_across_blocks() -> Result<()> {
reth_tracing::init_test_tracing();
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
)
.with_node_config_modifier(with_rocksdb_enabled)
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
.build()
.await?;
let wallets = wallet::Wallet::new(1).with_chain_id(chain_id).wallet_gen();
let signer = wallets[0].clone();
let client = nodes[0].rpc_client().expect("RPC client");
// Block 1: 2 transactions
let tx_hash_0 = nodes[0]
.rpc
.inject_tx(
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), 0).await,
)
.await?;
let tx_hash_1 = nodes[0]
.rpc
.inject_tx(
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), 1).await,
)
.await?;
// Wait for both txs to appear in pending pool
wait_for_pending_tx(&client, tx_hash_0).await;
wait_for_pending_tx(&client, tx_hash_1).await;
let payload1 = nodes[0].advance_block().await?;
assert_eq!(payload1.block().number(), 1);
// Block 2: 1 transaction
let tx_hash_2 = nodes[0]
.rpc
.inject_tx(
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), 2).await,
)
.await?;
wait_for_pending_tx(&client, tx_hash_2).await;
let payload2 = nodes[0].advance_block().await?;
assert_eq!(payload2.block().number(), 2);
// Verify block contents via RPC
let tx0: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash_0]).await?;
let tx1: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash_1]).await?;
let tx2: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash_2]).await?;
assert_eq!(tx0.expect("tx0").block_number, Some(1));
assert_eq!(tx1.expect("tx1").block_number, Some(1));
assert_eq!(tx2.expect("tx2").block_number, Some(2));
// Poll RocksDB and verify global tx_number continuity
let all_tx_hashes = [tx_hash_0, tx_hash_1, tx_hash_2];
let mut tx_numbers = Vec::new();
for tx_hash in &all_tx_hashes {
let n = poll_tx_in_rocksdb(&nodes[0].inner.provider, *tx_hash).await;
tx_numbers.push(n);
}
// Verify they form a continuous sequence {0, 1, 2}
tx_numbers.sort();
assert_eq!(tx_numbers, vec![0, 1, 2], "TxNumbers should be globally continuous: 0, 1, 2");
// Re-query block 1 txs after block 2 is mined (regression guard)
let tx0_again: Option<Transaction> =
client.request("eth_getTransactionByHash", [tx_hash_0]).await?;
assert!(tx0_again.is_some(), "Block 1 tx should still be queryable after block 2");
Ok(())
}
/// Pending transactions should NOT appear in `RocksDB` until mined.
#[tokio::test]
async fn test_rocksdb_pending_tx_not_in_storage() -> Result<()> {
reth_tracing::init_test_tracing();
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
)
.with_node_config_modifier(with_rocksdb_enabled)
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
.build()
.await?;
let wallets = wallet::Wallet::new(1).with_chain_id(chain_id).wallet_gen();
let signer = wallets[0].clone();
// Inject tx but do NOT mine
let raw_tx = TransactionTestContext::transfer_tx_bytes(chain_id, signer).await;
let tx_hash = nodes[0].rpc.inject_tx(raw_tx).await?;
// Verify tx is in pending pool via RPC
let client = nodes[0].rpc_client().expect("RPC client");
wait_for_pending_tx(&client, tx_hash).await;
let pending_tx: Option<Transaction> =
client.request("eth_getTransactionByHash", [tx_hash]).await?;
assert!(pending_tx.is_some(), "Pending tx should be visible via RPC");
assert!(pending_tx.unwrap().block_number.is_none(), "Pending tx should have no block_number");
// Assert tx is NOT in RocksDB before mining (single check - tx is confirmed pending)
let rocksdb = nodes[0].inner.provider.rocksdb_provider();
let tx_number: Option<u64> = rocksdb.get::<tables::TransactionHashNumbers>(tx_hash)?;
assert!(
tx_number.is_none(),
"Pending tx should NOT be in RocksDB before mining, but found tx_number={:?}",
tx_number
);
// Now mine the block
let payload = nodes[0].advance_block().await?;
assert_eq!(payload.block().number(), 1);
// Poll until tx appears in RocksDB
let tx_number = poll_tx_in_rocksdb(&nodes[0].inner.provider, tx_hash).await;
assert_eq!(tx_number, 0, "First tx should have tx_number 0");
// Verify tx is now mined via RPC
let mined_tx: Option<Transaction> =
client.request("eth_getTransactionByHash", [tx_hash]).await?;
assert_eq!(mined_tx.expect("mined tx").block_number, Some(1));
Ok(())
}

View File

@@ -50,17 +50,7 @@ pub const DEFAULT_PREWARM_MAX_CONCURRENCY: usize = 16;
const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = EPOCH_SLOTS as u32 * 2;
const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256;
const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4;
const DEFAULT_CROSS_BLOCK_CACHE_SIZE: usize = default_cross_block_cache_size();
const fn default_cross_block_cache_size() -> usize {
if cfg!(test) {
1024 * 1024 // 1 MB in tests
} else if cfg!(target_pointer_width = "32") {
usize::MAX // max possible on wasm32 / 32-bit
} else {
4 * 1024 * 1024 * 1024 // 4 GB on 64-bit
}
}
const DEFAULT_CROSS_BLOCK_CACHE_SIZE: u64 = 4 * 1024 * 1024 * 1024;
/// Determines if the host has enough parallelism to run the payload processor.
///
@@ -110,10 +100,12 @@ pub struct TreeConfig {
disable_state_cache: bool,
/// Whether to disable parallel prewarming.
disable_prewarming: bool,
/// Whether to disable the parallel sparse trie state root algorithm.
disable_parallel_sparse_trie: bool,
/// Whether to enable state provider metrics.
state_provider_metrics: bool,
/// Cross-block cache size in bytes.
cross_block_cache_size: usize,
cross_block_cache_size: u64,
/// Whether the host has enough parallelism to run state root task.
has_enough_parallelism: bool,
/// Whether multiproof task should chunk proof targets.
@@ -166,6 +158,7 @@ impl Default for TreeConfig {
always_compare_trie_updates: false,
disable_state_cache: false,
disable_prewarming: false,
disable_parallel_sparse_trie: false,
state_provider_metrics: false,
cross_block_cache_size: DEFAULT_CROSS_BLOCK_CACHE_SIZE,
has_enough_parallelism: has_enough_parallelism(),
@@ -198,8 +191,9 @@ impl TreeConfig {
always_compare_trie_updates: bool,
disable_state_cache: bool,
disable_prewarming: bool,
disable_parallel_sparse_trie: bool,
state_provider_metrics: bool,
cross_block_cache_size: usize,
cross_block_cache_size: u64,
has_enough_parallelism: bool,
multiproof_chunking_enabled: bool,
multiproof_chunk_size: usize,
@@ -224,6 +218,7 @@ impl TreeConfig {
always_compare_trie_updates,
disable_state_cache,
disable_prewarming,
disable_parallel_sparse_trie,
state_provider_metrics,
cross_block_cache_size,
has_enough_parallelism,
@@ -304,6 +299,11 @@ impl TreeConfig {
self.state_provider_metrics
}
/// Returns whether or not the parallel sparse trie is disabled.
pub const fn disable_parallel_sparse_trie(&self) -> bool {
self.disable_parallel_sparse_trie
}
/// Returns whether or not state cache is disabled.
pub const fn disable_state_cache(&self) -> bool {
self.disable_state_cache
@@ -321,7 +321,7 @@ impl TreeConfig {
}
/// Returns the cross-block cache size.
pub const fn cross_block_cache_size(&self) -> usize {
pub const fn cross_block_cache_size(&self) -> u64 {
self.cross_block_cache_size
}
@@ -424,7 +424,7 @@ impl TreeConfig {
}
/// Setter for cross block cache size.
pub const fn with_cross_block_cache_size(mut self, cross_block_cache_size: usize) -> Self {
pub const fn with_cross_block_cache_size(mut self, cross_block_cache_size: u64) -> Self {
self.cross_block_cache_size = cross_block_cache_size;
self
}
@@ -441,6 +441,15 @@ impl TreeConfig {
self
}
/// Setter for whether to disable the parallel sparse trie
pub const fn with_disable_parallel_sparse_trie(
mut self,
disable_parallel_sparse_trie: bool,
) -> Self {
self.disable_parallel_sparse_trie = disable_parallel_sparse_trie;
self
}
/// Setter for whether multiproof task should chunk proof targets.
pub const fn with_multiproof_chunking_enabled(
mut self,

View File

@@ -53,7 +53,7 @@ revm-primitives.workspace = true
futures.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "sync", "macros"] }
fixed-cache.workspace = true
mini-moka = { workspace = true, features = ["sync"] }
moka = { workspace = true, features = ["sync"] }
smallvec.workspace = true

File diff suppressed because it is too large Load Diff

View File

@@ -22,9 +22,6 @@ pub(crate) struct EngineApiMetrics {
pub(crate) block_validation: BlockValidationMetrics,
/// Canonical chain and reorg related metrics
pub tree: TreeMetrics,
/// Metrics for EIP-7928 Block-Level Access Lists (BAL).
#[allow(dead_code)]
pub(crate) bal: BalMetrics,
}
impl EngineApiMetrics {
@@ -242,8 +239,6 @@ pub(crate) struct NewPayloadStatusMetrics {
pub(crate) new_payload_error: Counter,
/// The total gas of valid new payload messages received.
pub(crate) new_payload_total_gas: Histogram,
/// The gas used for the last valid new payload.
pub(crate) new_payload_total_gas_last: Gauge,
/// The gas per second of valid new payload messages received.
pub(crate) new_payload_gas_per_second: Histogram,
/// The gas per second for the last new payload call.
@@ -256,8 +251,6 @@ pub(crate) struct NewPayloadStatusMetrics {
pub(crate) time_between_new_payloads: Histogram,
/// Time from previous payload start to current payload start (total interval).
pub(crate) new_payload_interval: Histogram,
/// Time diff between forkchoice updated call response and the next new payload call request.
pub(crate) forkchoice_updated_new_payload_time_diff: Histogram,
}
impl NewPayloadStatusMetrics {
@@ -265,7 +258,6 @@ impl NewPayloadStatusMetrics {
pub(crate) fn update_response_metrics(
&mut self,
start: Instant,
latest_forkchoice_updated_at: &mut Option<Instant>,
result: &Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError>,
gas_used: u64,
) {
@@ -285,7 +277,6 @@ impl NewPayloadStatusMetrics {
PayloadStatusEnum::Valid => {
self.new_payload_valid.increment(1);
self.new_payload_total_gas.record(gas_used as f64);
self.new_payload_total_gas_last.set(gas_used as f64);
let gas_per_second = gas_used as f64 / elapsed.as_secs_f64();
self.new_payload_gas_per_second.record(gas_per_second);
self.new_payload_gas_per_second_last.set(gas_per_second);
@@ -299,40 +290,9 @@ impl NewPayloadStatusMetrics {
self.new_payload_messages.increment(1);
self.new_payload_latency.record(elapsed);
self.new_payload_last.set(elapsed);
if let Some(latest_forkchoice_updated_at) = latest_forkchoice_updated_at.take() {
self.forkchoice_updated_new_payload_time_diff
.record(start - latest_forkchoice_updated_at);
}
}
}
/// Metrics for EIP-7928 Block-Level Access Lists (BAL).
///
/// See also <https://github.com/ethereum/execution-metrics/issues/5>
#[allow(dead_code)]
#[derive(Metrics, Clone)]
#[metrics(scope = "execution.block_access_list")]
pub(crate) struct BalMetrics {
/// Size of the BAL in bytes for the current block.
pub(crate) size_bytes: Gauge,
/// Total number of blocks with valid BALs.
pub(crate) valid_total: Counter,
/// Total number of blocks with invalid BALs.
pub(crate) invalid_total: Counter,
/// Time taken to validate the BAL against actual execution.
pub(crate) validation_time_seconds: Histogram,
/// Number of account changes in the BAL.
pub(crate) account_changes: Gauge,
/// Number of storage changes in the BAL.
pub(crate) storage_changes: Gauge,
/// Number of balance changes in the BAL.
pub(crate) balance_changes: Gauge,
/// Number of nonce changes in the BAL.
pub(crate) nonce_changes: Gauge,
/// Number of code changes in the BAL.
pub(crate) code_changes: Gauge,
}
/// Metrics for non-execution related block validation.
#[derive(Metrics, Clone)]
#[metrics(scope = "sync.block_validation")]
@@ -341,8 +301,6 @@ pub(crate) struct BlockValidationMetrics {
pub(crate) state_root_storage_tries_updated_total: Counter,
/// Total number of times the parallel state root computation fell back to regular.
pub(crate) state_root_parallel_fallback_total: Counter,
/// Total number of times the state root task failed but the fallback succeeded.
pub(crate) state_root_task_fallback_success_total: Counter,
/// Latest state root duration, ie the time spent blocked waiting for the state root.
pub(crate) state_root_duration: Gauge,
/// Histogram for state root duration ie the time spent blocked waiting for the state root

View File

@@ -85,12 +85,6 @@ pub mod state;
/// backfill this gap.
pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
/// The minimum number of blocks to retain in the changeset cache after eviction.
///
/// This ensures that recent trie changesets are kept in memory for potential reorgs,
/// even when the finalized block is not set (e.g., on L2s like Optimism).
const CHANGESET_CACHE_RETENTION_BLOCKS: u64 = 64;
/// A builder for creating state providers that can be used across threads.
#[derive(Clone, Debug)]
pub struct StateProviderBuilder<N: NodePrimitives, P> {
@@ -1384,27 +1378,19 @@ where
debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish");
self.persistence_state.finish(last_persisted_block_hash, last_persisted_block_number);
// Evict trie changesets for blocks below the eviction threshold.
// Keep at least CHANGESET_CACHE_RETENTION_BLOCKS from the persisted tip, and also respect
// the finalized block if set.
let min_threshold =
last_persisted_block_number.saturating_sub(CHANGESET_CACHE_RETENTION_BLOCKS);
let eviction_threshold =
if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() {
// Use the minimum of finalized block and retention threshold to be conservative
finalized.number.min(min_threshold)
} else {
// When finalized is not set (e.g., on L2s), use the retention threshold
min_threshold
};
debug!(
target: "engine::tree",
last_persisted = last_persisted_block_number,
finalized_number = ?self.canonical_in_memory_state.get_finalized_num_hash().map(|f| f.number),
eviction_threshold,
"Evicting changesets below threshold"
);
self.changeset_cache.evict(eviction_threshold);
// Evict trie changesets for blocks below the finalized block, but keep at least 64 blocks
if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() {
let min_threshold = last_persisted_block_number.saturating_sub(64);
let eviction_threshold = finalized.number.min(min_threshold);
debug!(
target: "engine::tree",
last_persisted = last_persisted_block_number,
finalized_number = finalized.number,
eviction_threshold,
"Evicting changesets below threshold"
);
self.changeset_cache.evict(eviction_threshold);
}
self.on_new_persisted_block()?;
Ok(())
@@ -1492,10 +1478,6 @@ where
self.on_maybe_tree_event(res.event.take())?;
}
if let Err(ref err) = output {
error!(target: "engine::tree", %err, ?state, "Error processing forkchoice update");
}
self.metrics.engine.forkchoice_updated.update_response_metrics(
start,
&mut self.metrics.engine.new_payload.latest_finish_at,
@@ -1518,12 +1500,10 @@ where
let gas_used = payload.gas_used();
let num_hash = payload.num_hash();
let mut output = self.on_new_payload(payload);
self.metrics.engine.new_payload.update_response_metrics(
start,
&mut self.metrics.engine.forkchoice_updated.latest_finish_at,
&output,
gas_used,
);
self.metrics
.engine
.new_payload
.update_response_metrics(start, &output, gas_used);
let maybe_event =
output.as_mut().ok().and_then(|out| out.event.take());

View File

@@ -0,0 +1,188 @@
//! Configured sparse trie enum for switching between serial and parallel implementations.
use alloy_primitives::B256;
use reth_trie::{BranchNodeMasks, Nibbles, ProofTrieNode, TrieNode};
use reth_trie_sparse::{
errors::SparseTrieResult, provider::TrieNodeProvider, LeafLookup, LeafLookupError,
SerialSparseTrie, SparseTrieInterface, SparseTrieUpdates,
};
use reth_trie_sparse_parallel::ParallelSparseTrie;
use std::borrow::Cow;
/// Enum for switching between serial and parallel sparse trie implementations.
///
/// This type allows runtime selection between different sparse trie implementations,
/// providing flexibility in choosing the appropriate implementation based on workload
/// characteristics.
#[derive(Debug, Clone)]
pub(crate) enum ConfiguredSparseTrie {
/// Serial implementation of the sparse trie.
Serial(Box<SerialSparseTrie>),
/// Parallel implementation of the sparse trie.
Parallel(Box<ParallelSparseTrie>),
}
impl From<SerialSparseTrie> for ConfiguredSparseTrie {
fn from(trie: SerialSparseTrie) -> Self {
Self::Serial(Box::new(trie))
}
}
impl From<ParallelSparseTrie> for ConfiguredSparseTrie {
fn from(trie: ParallelSparseTrie) -> Self {
Self::Parallel(Box::new(trie))
}
}
impl Default for ConfiguredSparseTrie {
fn default() -> Self {
Self::Serial(Default::default())
}
}
impl SparseTrieInterface for ConfiguredSparseTrie {
fn with_root(
self,
root: TrieNode,
masks: Option<BranchNodeMasks>,
retain_updates: bool,
) -> SparseTrieResult<Self> {
match self {
Self::Serial(trie) => {
trie.with_root(root, masks, retain_updates).map(|t| Self::Serial(Box::new(t)))
}
Self::Parallel(trie) => {
trie.with_root(root, masks, retain_updates).map(|t| Self::Parallel(Box::new(t)))
}
}
}
fn with_updates(self, retain_updates: bool) -> Self {
match self {
Self::Serial(trie) => Self::Serial(Box::new(trie.with_updates(retain_updates))),
Self::Parallel(trie) => Self::Parallel(Box::new(trie.with_updates(retain_updates))),
}
}
fn reserve_nodes(&mut self, additional: usize) {
match self {
Self::Serial(trie) => trie.reserve_nodes(additional),
Self::Parallel(trie) => trie.reserve_nodes(additional),
}
}
fn reveal_node(
&mut self,
path: Nibbles,
node: TrieNode,
masks: Option<BranchNodeMasks>,
) -> SparseTrieResult<()> {
match self {
Self::Serial(trie) => trie.reveal_node(path, node, masks),
Self::Parallel(trie) => trie.reveal_node(path, node, masks),
}
}
fn reveal_nodes(&mut self, nodes: Vec<ProofTrieNode>) -> SparseTrieResult<()> {
match self {
Self::Serial(trie) => trie.reveal_nodes(nodes),
Self::Parallel(trie) => trie.reveal_nodes(nodes),
}
}
fn update_leaf<P: TrieNodeProvider>(
&mut self,
full_path: Nibbles,
value: Vec<u8>,
provider: P,
) -> SparseTrieResult<()> {
match self {
Self::Serial(trie) => trie.update_leaf(full_path, value, provider),
Self::Parallel(trie) => trie.update_leaf(full_path, value, provider),
}
}
fn remove_leaf<P: TrieNodeProvider>(
&mut self,
full_path: &Nibbles,
provider: P,
) -> SparseTrieResult<()> {
match self {
Self::Serial(trie) => trie.remove_leaf(full_path, provider),
Self::Parallel(trie) => trie.remove_leaf(full_path, provider),
}
}
fn root(&mut self) -> B256 {
match self {
Self::Serial(trie) => trie.root(),
Self::Parallel(trie) => trie.root(),
}
}
fn update_subtrie_hashes(&mut self) {
match self {
Self::Serial(trie) => trie.update_subtrie_hashes(),
Self::Parallel(trie) => trie.update_subtrie_hashes(),
}
}
fn get_leaf_value(&self, full_path: &Nibbles) -> Option<&Vec<u8>> {
match self {
Self::Serial(trie) => trie.get_leaf_value(full_path),
Self::Parallel(trie) => trie.get_leaf_value(full_path),
}
}
fn find_leaf(
&self,
full_path: &Nibbles,
expected_value: Option<&Vec<u8>>,
) -> Result<LeafLookup, LeafLookupError> {
match self {
Self::Serial(trie) => trie.find_leaf(full_path, expected_value),
Self::Parallel(trie) => trie.find_leaf(full_path, expected_value),
}
}
fn take_updates(&mut self) -> SparseTrieUpdates {
match self {
Self::Serial(trie) => trie.take_updates(),
Self::Parallel(trie) => trie.take_updates(),
}
}
fn wipe(&mut self) {
match self {
Self::Serial(trie) => trie.wipe(),
Self::Parallel(trie) => trie.wipe(),
}
}
fn clear(&mut self) {
match self {
Self::Serial(trie) => trie.clear(),
Self::Parallel(trie) => trie.clear(),
}
}
fn updates_ref(&self) -> Cow<'_, SparseTrieUpdates> {
match self {
Self::Serial(trie) => trie.updates_ref(),
Self::Parallel(trie) => trie.updates_ref(),
}
}
fn shrink_nodes_to(&mut self, size: usize) {
match self {
Self::Serial(trie) => trie.shrink_nodes_to(size),
Self::Parallel(trie) => trie.shrink_nodes_to(size),
}
}
fn shrink_values_to(&mut self, size: usize) {
match self {
Self::Serial(trie) => trie.shrink_values_to(size),
Self::Parallel(trie) => trie.shrink_values_to(size),
}
}
}

View File

@@ -2,7 +2,10 @@
use super::precompile_cache::PrecompileCacheMap;
use crate::tree::{
cached_state::{CachedStateMetrics, CachedStateProvider, ExecutionCache, SavedCache},
cached_state::{
CachedStateMetrics, CachedStateProvider, ExecutionCache as StateExecutionCache,
ExecutionCacheBuilder, SavedCache,
},
payload_processor::{
prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
sparse_trie::StateRootComputeOutcome,
@@ -41,7 +44,7 @@ use reth_trie_parallel::{
};
use reth_trie_sparse::{
provider::{TrieNodeProvider, TrieNodeProviderFactory},
ClearedSparseStateTrie, RevealableSparseTrie, SparseStateTrie,
ClearedSparseStateTrie, SparseStateTrie, SparseTrie,
};
use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds};
use std::{
@@ -57,12 +60,15 @@ use std::{
use tracing::{debug, debug_span, instrument, warn, Span};
pub mod bal;
mod configured_sparse_trie;
pub mod executor;
pub mod multiproof;
pub mod prewarm;
pub mod receipt_root_task;
pub mod sparse_trie;
use configured_sparse_trie::ConfiguredSparseTrie;
/// Default parallelism thresholds to use with the [`ParallelSparseTrie`].
///
/// These values were determined by performing benchmarks using gradually increasing values to judge
@@ -110,11 +116,11 @@ where
/// The executor used by to spawn tasks.
executor: WorkloadExecutor,
/// The most recent cache used for execution.
execution_cache: PayloadExecutionCache,
execution_cache: ExecutionCache,
/// Metrics for trie operations
trie_metrics: MultiProofTaskMetrics,
/// Cross-block cache size in bytes.
cross_block_cache_size: usize,
cross_block_cache_size: u64,
/// Whether transactions should not be executed on prewarming task.
disable_transaction_prewarming: bool,
/// Whether state cache should be disable
@@ -128,8 +134,12 @@ where
/// A cleared `SparseStateTrie`, kept around to be reused for the state root computation so
/// that allocations can be minimized.
sparse_state_trie: Arc<
parking_lot::Mutex<Option<ClearedSparseStateTrie<ParallelSparseTrie, ParallelSparseTrie>>>,
parking_lot::Mutex<
Option<ClearedSparseStateTrie<ConfiguredSparseTrie, ConfiguredSparseTrie>>,
>,
>,
/// Whether to disable the parallel sparse trie.
disable_parallel_sparse_trie: bool,
/// Maximum concurrency for prewarm task.
prewarm_max_concurrency: usize,
/// Whether to disable cache metrics recording.
@@ -164,6 +174,7 @@ where
precompile_cache_disabled: config.precompile_cache_disabled(),
precompile_cache_map,
sparse_state_trie: Arc::default(),
disable_parallel_sparse_trie: config.disable_parallel_sparse_trie(),
prewarm_max_concurrency: config.prewarm_max_concurrency(),
disable_cache_metrics: config.disable_cache_metrics(),
}
@@ -302,7 +313,7 @@ where
// Build a state provider for the multiproof task
let provider = provider_builder.build().expect("failed to build provider");
let provider = if let Some(saved_cache) = saved_cache {
let (cache, metrics, _disable_metrics) = saved_cache.split();
let (cache, metrics, _) = saved_cache.split();
Box::new(CachedStateProvider::new(provider, cache, metrics))
as Box<dyn StateProvider>
} else {
@@ -484,11 +495,8 @@ where
cache
} else {
debug!("creating new execution cache on cache miss");
let start = Instant::now();
let cache = ExecutionCache::new(self.cross_block_cache_size);
let metrics = CachedStateMetrics::zeroed();
metrics.record_cache_creation(start.elapsed());
SavedCache::new(parent_hash, cache, metrics)
let cache = ExecutionCacheBuilder::default().build_caches(self.cross_block_cache_size);
SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed())
.with_disable_cache_metrics(self.disable_cache_metrics)
}
}
@@ -506,6 +514,7 @@ where
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
{
let cleared_sparse_trie = Arc::clone(&self.sparse_state_trie);
let disable_parallel_sparse_trie = self.disable_parallel_sparse_trie;
let trie_metrics = self.trie_metrics.clone();
let span = Span::current();
@@ -515,10 +524,14 @@ where
// Reuse a stored SparseStateTrie, or create a new one using the desired configuration
// if there's none to reuse.
let sparse_state_trie = cleared_sparse_trie.lock().take().unwrap_or_else(|| {
let default_trie = RevealableSparseTrie::blind_from(
ParallelSparseTrie::default()
.with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
);
let default_trie = SparseTrie::blind_from(if disable_parallel_sparse_trie {
ConfiguredSparseTrie::Serial(Default::default())
} else {
ConfiguredSparseTrie::Parallel(Box::new(
ParallelSparseTrie::default()
.with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
))
});
ClearedSparseStateTrie::from_state_trie(
SparseStateTrie::new()
.with_accounts_trie(default_trie.clone())
@@ -527,13 +540,12 @@ where
)
});
let task =
SparseTrieTask::<_, ParallelSparseTrie, ParallelSparseTrie>::new_with_cleared_trie(
sparse_trie_rx,
proof_worker_handle,
trie_metrics,
sparse_state_trie,
);
let task = SparseTrieTask::<_, ConfiguredSparseTrie, ConfiguredSparseTrie>::new_with_cleared_trie(
sparse_trie_rx,
proof_worker_handle,
trie_metrics,
sparse_state_trie,
);
let (result, trie) = task.run();
// Send state root computation result
@@ -575,27 +587,28 @@ where
parent_hash = %block_with_parent.parent,
"Cannot find cache for parent hash, skip updating cache with new state for inserted executed block",
);
return
return;
}
// Take existing cache (if any) or create fresh caches
let (caches, cache_metrics, _) = match cached.take() {
Some(existing) => existing.split(),
let (caches, cache_metrics) = match cached.take() {
Some(existing) => {
let (c, m, _) = existing.split();
(c, m)
}
None => (
ExecutionCache::new(self.cross_block_cache_size),
ExecutionCacheBuilder::default().build_caches(self.cross_block_cache_size),
CachedStateMetrics::zeroed(),
false,
),
};
// Insert the block's bundle state into cache
let new_cache =
SavedCache::new(block_with_parent.block.hash, caches, cache_metrics)
.with_disable_cache_metrics(disable_cache_metrics);
let new_cache = SavedCache::new(block_with_parent.block.hash, caches, cache_metrics)
.with_disable_cache_metrics(disable_cache_metrics);
if new_cache.cache().insert_state(bundle_state).is_err() {
*cached = None;
debug!(target: "engine::caching", "cleared execution cache on update error");
return
return;
}
new_cache.update_metrics();
@@ -659,7 +672,7 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
}
/// Returns a clone of the caches used by prewarming
pub(super) fn caches(&self) -> Option<ExecutionCache> {
pub(super) fn caches(&self) -> Option<StateExecutionCache> {
self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.cache().clone())
}
@@ -763,29 +776,29 @@ impl<R> Drop for CacheTaskHandle<R> {
/// ## Cache Safety
///
/// **CRITICAL**: Cache update operations require exclusive access. All concurrent cache users
/// (such as prewarming tasks) must be terminated before calling
/// [`PayloadExecutionCache::update_with_guard`], otherwise the cache may be corrupted or cleared.
/// (such as prewarming tasks) must be terminated before calling `update_with_guard`, otherwise
/// the cache may be corrupted or cleared.
///
/// ## Cache vs Prewarming Distinction
///
/// **[`PayloadExecutionCache`]**:
/// **`ExecutionCache`**:
/// - Stores parent block's execution state after completion
/// - Used to fetch parent data for next block's execution
/// - Must be exclusively accessed during save operations
///
/// **[`PrewarmCacheTask`]**:
/// **`PrewarmCacheTask`**:
/// - Speculatively loads accounts/storage that might be used in transaction execution
/// - Prepares data for state root proof computation
/// - Runs concurrently but must not interfere with cache saves
#[derive(Clone, Debug, Default)]
struct PayloadExecutionCache {
struct ExecutionCache {
/// Guarded cloneable cache identified by a block hash.
inner: Arc<RwLock<Option<SavedCache>>>,
/// Metrics for cache operations.
metrics: ExecutionCacheMetrics,
}
impl PayloadExecutionCache {
impl ExecutionCache {
/// Returns the cache for `parent_hash` if it's available for use.
///
/// A cache is considered available when:
@@ -821,15 +834,11 @@ impl PayloadExecutionCache {
"Existing cache found"
);
if available {
// If the has is available (no other threads are using it), but has a mismatching
// parent hash, we can just clear it and keep using without re-creating from
// scratch.
if !hash_matches {
c.clear();
}
return Some(c.clone())
} else if hash_matches {
if hash_matches && available {
return Some(c.clone());
}
if hash_matches && !available {
self.metrics.execution_cache_in_use.increment(1);
}
} else {
@@ -902,9 +911,9 @@ where
#[cfg(test)]
mod tests {
use super::PayloadExecutionCache;
use super::ExecutionCache;
use crate::tree::{
cached_state::{CachedStateMetrics, ExecutionCache, SavedCache},
cached_state::{CachedStateMetrics, ExecutionCacheBuilder, SavedCache},
payload_processor::{
evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
},
@@ -934,13 +943,13 @@ mod tests {
use std::sync::Arc;
fn make_saved_cache(hash: B256) -> SavedCache {
let execution_cache = ExecutionCache::new(1_000);
let execution_cache = ExecutionCacheBuilder::default().build_caches(1_000);
SavedCache::new(hash, execution_cache, CachedStateMetrics::zeroed())
}
#[test]
fn execution_cache_allows_single_checkout() {
let execution_cache = PayloadExecutionCache::default();
let execution_cache = ExecutionCache::default();
let hash = B256::from([1u8; 32]);
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
@@ -959,7 +968,7 @@ mod tests {
#[test]
fn execution_cache_checkout_releases_on_drop() {
let execution_cache = PayloadExecutionCache::default();
let execution_cache = ExecutionCache::default();
let hash = B256::from([2u8; 32]);
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
@@ -975,21 +984,19 @@ mod tests {
}
#[test]
fn execution_cache_mismatch_parent_clears_and_returns() {
let execution_cache = PayloadExecutionCache::default();
fn execution_cache_mismatch_parent_returns_none() {
let execution_cache = ExecutionCache::default();
let hash = B256::from([3u8; 32]);
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
// When the parent hash doesn't match, the cache is cleared and returned for reuse
let different_hash = B256::from([4u8; 32]);
let cache = execution_cache.get_cache_for(different_hash);
assert!(cache.is_some(), "cache should be returned for reuse after clearing")
let miss = execution_cache.get_cache_for(B256::from([4u8; 32]));
assert!(miss.is_none(), "checkout should fail for different parent hash");
}
#[test]
fn execution_cache_update_after_release_succeeds() {
let execution_cache = PayloadExecutionCache::default();
let execution_cache = ExecutionCache::default();
let initial = B256::from([5u8; 32]);
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(initial)));

View File

@@ -777,21 +777,10 @@ impl MultiProofTask {
// [`MultiAddedRemovedKeys`]. Even if there are not any known removed keys for the account,
// we still want to optimistically fetch extension children for the leaf addition case.
// V2 multiproofs don't need this.
//
// Only clone the AddedRemovedKeys for accounts in the targets, not the entire accumulated
// set, to avoid O(n) cloning with many buffered blocks.
let multi_added_removed_keys =
if let VersionedMultiProofTargets::Legacy(legacy_targets) = &targets {
self.multi_added_removed_keys.touch_accounts(legacy_targets.keys().copied());
Some(Arc::new(MultiAddedRemovedKeys {
account: self.multi_added_removed_keys.account.clone(),
storages: legacy_targets
.keys()
.filter_map(|k| {
self.multi_added_removed_keys.storages.get(k).map(|v| (*k, v.clone()))
})
.collect(),
}))
Some(Arc::new(self.multi_added_removed_keys.clone()))
} else {
None
};
@@ -1527,9 +1516,8 @@ where
#[cfg(test)]
mod tests {
use crate::tree::cached_state::CachedStateProvider;
use super::*;
use crate::tree::cached_state::{CachedStateProvider, ExecutionCacheBuilder};
use alloy_eip7928::{AccountChanges, BalanceChange};
use alloy_primitives::Address;
use reth_provider::{
@@ -1589,7 +1577,7 @@ mod tests {
{
let db_provider = factory.database_provider_ro().unwrap();
let state_provider: StateProviderBox = Box::new(LatestStateProvider::new(db_provider));
let cache = crate::tree::cached_state::ExecutionCache::new(1000);
let cache = ExecutionCacheBuilder::default().build_caches(1000);
CachedStateProvider::new(state_provider, cache, Default::default())
}

View File

@@ -17,16 +17,17 @@ use crate::tree::{
bal::{total_slots, BALSlotIter},
executor::WorkloadExecutor,
multiproof::{MultiProofMessage, VersionedMultiProofTargets},
PayloadExecutionCache,
ExecutionCache as PayloadExecutionCache,
},
precompile_cache::{CachedPrecompile, PrecompileCacheMap},
ExecutionEnv, StateProviderBuilder,
};
use alloy_consensus::transaction::TxHashRef;
use alloy_eip7928::BlockAccessList;
use alloy_eips::Typed2718;
use alloy_evm::Database;
use alloy_primitives::{keccak256, map::B256Set, B256};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use crossbeam_channel::Sender as CrossbeamSender;
use metrics::{Counter, Gauge, Histogram};
use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, RecoveredTx, SpecFor};
use reth_metrics::Metrics;
@@ -65,6 +66,19 @@ struct IndexedTransaction<Tx> {
tx: Tx,
}
/// Maximum standard Ethereum transaction type value.
///
/// Standard transaction types are:
/// - Type 0: Legacy transactions (original Ethereum)
/// - Type 1: EIP-2930 (access list transactions)
/// - Type 2: EIP-1559 (dynamic fee transactions)
/// - Type 3: EIP-4844 (blob transactions)
/// - Type 4: EIP-7702 (set code authorization transactions)
///
/// Any transaction with a type > 4 is considered a non-standard/system transaction,
/// typically used by L2s for special purposes (e.g., Optimism deposit transactions use type 126).
const MAX_STANDARD_TX_TYPE: u8 = 4;
/// A task that is responsible for caching and prewarming the cache by executing transactions
/// individually in parallel.
///
@@ -163,8 +177,8 @@ where
transaction_count_hint.min(max_concurrency)
};
// Spawn workers
let tx_sender = ctx.clone().spawn_workers(workers_needed, &executor, actions_tx.clone(), done_tx.clone());
// Initialize worker handles container
let handles = ctx.clone().spawn_workers(workers_needed, &executor, actions_tx.clone(), done_tx.clone());
// Distribute transactions to workers
let mut tx_index = 0usize;
@@ -179,18 +193,37 @@ where
}
let indexed_tx = IndexedTransaction { index: tx_index, tx };
let is_system_tx = indexed_tx.tx.tx().ty() > MAX_STANDARD_TX_TYPE;
// Send transaction to the workers
// Ignore send errors: workers listen to terminate_execution and may
// exit early when signaled.
let _ = tx_sender.send(indexed_tx);
// System transactions (type > 4) in the first position set critical metadata
// that affects all subsequent transactions (e.g., L1 block info on L2s).
// Broadcast the first system transaction to all workers to ensure they have
// the critical state. This is particularly important for L2s like Optimism
// where the first deposit transaction (type 126) contains essential block metadata.
if tx_index == 0 && is_system_tx {
for handle in &handles {
// Ignore send errors: workers listen to terminate_execution and may
// exit early when signaled. Sending to a disconnected worker is
// possible and harmless and should happen at most once due to
// the terminate_execution check above.
let _ = handle.send(indexed_tx.clone());
}
} else {
// Round-robin distribution for all other transactions
let worker_idx = tx_index % workers_needed;
// Ignore send errors: workers listen to terminate_execution and may
// exit early when signaled. Sending to a disconnected worker is
// possible and harmless and should happen at most once due to
// the terminate_execution check above.
let _ = handles[worker_idx].send(indexed_tx);
}
tx_index += 1;
}
// drop sender and wait for all tasks to finish
// drop handle and wait for all tasks to finish and drop theirs
drop(done_tx);
drop(tx_sender);
drop(handles);
while done_rx.recv().is_ok() {}
let _ = actions_tx
@@ -529,7 +562,7 @@ where
Some((evm, metrics, terminate_execution, v2_proofs_enabled))
}
/// Accepts a [`CrossbeamReceiver`] of transactions and a handle to prewarm task. Executes
/// Accepts an [`mpsc::Receiver`] of transactions and a handle to prewarm task. Executes
/// transactions and streams [`PrewarmTaskEvent::Outcome`] messages for each transaction.
///
/// This function processes transactions sequentially from the receiver and emits outcome events
@@ -541,7 +574,7 @@ where
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn transact_batch<Tx>(
self,
txs: CrossbeamReceiver<IndexedTransaction<Tx>>,
txs: mpsc::Receiver<IndexedTransaction<Tx>>,
sender: Sender<PrewarmTaskEvent<N::Receipt>>,
done_tx: Sender<()>,
) where
@@ -628,31 +661,35 @@ where
let _ = done_tx.send(());
}
/// Spawns worker tasks that pull transactions from a shared channel.
///
/// Returns the sender for distributing transactions to workers.
/// Spawns a worker task for transaction execution and returns its sender channel.
fn spawn_workers<Tx>(
self,
workers_needed: usize,
task_executor: &WorkloadExecutor,
actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
done_tx: Sender<()>,
) -> CrossbeamSender<IndexedTransaction<Tx>>
) -> Vec<mpsc::Sender<IndexedTransaction<Tx>>>
where
Tx: ExecutableTxFor<Evm> + Send + 'static,
{
let (tx_sender, tx_receiver) = crossbeam_channel::unbounded();
let mut handles = Vec::with_capacity(workers_needed);
let mut receivers = Vec::with_capacity(workers_needed);
// Spawn workers that all pull from the shared receiver
for _ in 0..workers_needed {
let (tx, rx) = mpsc::channel();
handles.push(tx);
receivers.push(rx);
}
// Spawn a separate task spawning workers in parallel.
let executor = task_executor.clone();
let span = Span::current();
task_executor.spawn_blocking(move || {
let _enter = span.entered();
for idx in 0..workers_needed {
for (idx, rx) in receivers.into_iter().enumerate() {
let ctx = self.clone();
let actions_tx = actions_tx.clone();
let done_tx = done_tx.clone();
let rx = tx_receiver.clone();
let span = debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx);
executor.spawn_blocking(move || {
let _enter = span.entered();
@@ -661,7 +698,7 @@ where
}
});
tx_sender
handles
}
/// Spawns a worker task for BAL slot prefetching.

View File

@@ -8,7 +8,7 @@ use reth_trie_parallel::{proof_task::ProofResult, root::ParallelStateRootError};
use reth_trie_sparse::{
errors::{SparseStateTrieResult, SparseTrieErrorKind},
provider::{TrieNodeProvider, TrieNodeProviderFactory},
ClearedSparseStateTrie, SerialSparseTrie, SparseStateTrie, SparseTrie,
ClearedSparseStateTrie, SerialSparseTrie, SparseStateTrie, SparseTrieInterface,
};
use smallvec::SmallVec;
use std::{
@@ -38,8 +38,8 @@ where
BPF: TrieNodeProviderFactory + Send + Sync + Clone,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + Send + Sync + Default,
S: SparseTrie + Send + Sync + Default + Clone,
A: SparseTrieInterface + Send + Sync + Default,
S: SparseTrieInterface + Send + Sync + Default + Clone,
{
/// Creates a new sparse trie, pre-populating with a [`ClearedSparseStateTrie`].
pub(super) fn new_with_cleared_trie(
@@ -150,8 +150,8 @@ where
BPF: TrieNodeProviderFactory + Send + Sync,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrie + Send + Sync + Default,
S: SparseTrie + Send + Sync + Default + Clone,
A: SparseTrieInterface + Send + Sync + Default,
S: SparseTrieInterface + Send + Sync + Default + Clone,
{
trace!(target: "engine::root::sparse", "Updating sparse trie");
let started_at = Instant::now();

View File

@@ -503,7 +503,6 @@ where
let root_time = Instant::now();
let mut maybe_state_root = None;
let mut state_root_task_failed = false;
match strategy {
StateRootStrategy::StateRootTask => {
@@ -522,12 +521,10 @@ where
block_state_root = ?block.header().state_root(),
"State root task returned incorrect state root"
);
state_root_task_failed = true;
}
}
Err(error) => {
debug!(target: "engine::tree::payload_validator", %error, "State root task failed");
state_root_task_failed = true;
}
}
}
@@ -572,11 +569,6 @@ where
self.compute_state_root_serial(overlay_factory.clone(), &hashed_state),
block
);
if state_root_task_failed {
self.metrics.block_validation.state_root_task_fallback_success_total.increment(1);
}
(root, updates, root_time.elapsed())
};

View File

@@ -52,7 +52,7 @@ pub fn read_dir(
checksums.next().transpose()?.ok_or_eyre("Got less checksums than ERA files")?;
}
entries.sort_by_key(|(left, _)| *left);
entries.sort_by(|(left, _), (right, _)| left.cmp(right));
Ok(stream::iter(entries.into_iter().skip_while(move |(n, _)| *n < start_index).map(
move |(_, path)| {

View File

@@ -20,7 +20,6 @@ use reth_era::{
},
};
use reth_fs_util as fs;
use reth_primitives_traits::Block;
use reth_storage_api::{BlockNumReader, BlockReader, HeaderProvider};
use std::{
path::PathBuf,
@@ -296,11 +295,9 @@ where
return Err(eyre!("Expected block {expected_block_number}, got {actual_block_number}"));
}
// CompressedBody must contain the block *body* (rlp(body)), not the full block (rlp(block)).
let body = provider
.block_by_number(actual_block_number)?
.ok_or_else(|| eyre!("Block not found for block {}", actual_block_number))?
.into_body();
.ok_or_else(|| eyre!("Block body not found for block {}", actual_block_number))?;
let receipts = provider
.receipts_by_block(actual_block_number.into())?

View File

@@ -59,7 +59,6 @@ std = [
"reth-storage-errors/std",
]
test-utils = [
"std",
"dep:parking_lot",
"dep:derive_more",
"reth-chainspec/test-utils",

View File

@@ -456,22 +456,17 @@ pub struct EthereumPoolBuilder {
// TODO add options for txpool args
}
impl<Types, Node, Evm> PoolBuilder<Node, Evm> for EthereumPoolBuilder
impl<Types, Node> PoolBuilder<Node> for EthereumPoolBuilder
where
Types: NodeTypes<
ChainSpec: EthereumHardforks,
Primitives: NodePrimitives<SignedTx = TransactionSigned>,
>,
Node: FullNodeTypes<Types = Types>,
Evm: ConfigureEvm<Primitives = PrimitivesTy<Types>> + Clone + 'static,
{
type Pool = EthTransactionPool<Node::Provider, DiskFileBlobStore, Evm>;
type Pool = EthTransactionPool<Node::Provider, DiskFileBlobStore>;
async fn build_pool(
self,
ctx: &BuilderContext<Node>,
evm_config: Evm,
) -> eyre::Result<Self::Pool> {
async fn build_pool(self, ctx: &BuilderContext<Node>) -> eyre::Result<Self::Pool> {
let pool_config = ctx.pool_config();
let blobs_disabled = ctx.config().txpool.disable_blobs_support ||
@@ -497,17 +492,17 @@ where
let blob_store =
reth_node_builder::components::create_blob_store_with_cache(ctx, blob_cache_size)?;
let validator =
TransactionValidationTaskExecutor::eth_builder(ctx.provider().clone(), evm_config)
.set_eip4844(!blobs_disabled)
.kzg_settings(ctx.kzg_settings()?)
.with_max_tx_input_bytes(ctx.config().txpool.max_tx_input_bytes)
.with_local_transactions_config(pool_config.local_transactions_config.clone())
.set_tx_fee_cap(ctx.config().rpc.rpc_tx_fee_cap)
.with_max_tx_gas_limit(ctx.config().txpool.max_tx_gas_limit)
.with_minimum_priority_fee(ctx.config().txpool.minimum_priority_fee)
.with_additional_tasks(ctx.config().txpool.additional_validation_tasks)
.build_with_tasks(ctx.task_executor().clone(), blob_store.clone());
let validator = TransactionValidationTaskExecutor::eth_builder(ctx.provider().clone())
.with_head_timestamp(ctx.head().timestamp)
.set_eip4844(!blobs_disabled)
.kzg_settings(ctx.kzg_settings()?)
.with_max_tx_input_bytes(ctx.config().txpool.max_tx_input_bytes)
.with_local_transactions_config(pool_config.local_transactions_config.clone())
.set_tx_fee_cap(ctx.config().rpc.rpc_tx_fee_cap)
.with_max_tx_gas_limit(ctx.config().txpool.max_tx_gas_limit)
.with_minimum_priority_fee(ctx.config().txpool.minimum_priority_fee)
.with_additional_tasks(ctx.config().txpool.additional_validation_tasks)
.build_with_tasks(ctx.task_executor().clone(), blob_store.clone());
if validator.validator().eip4844() {
// initializing the KZG settings can be expensive, this should be done upfront so that

View File

@@ -9,7 +9,6 @@ mod p2p;
mod pool;
mod prestate;
mod rpc;
mod selfdestruct;
mod utils;
const fn main() {}

View File

@@ -1,529 +0,0 @@
//! E2E tests for SELFDESTRUCT behavior and output state verification.
//!
//! These tests verify that:
//! - Pre-Dencun: SELFDESTRUCT clears storage and code, output state reflects this
//! - Post-Dencun (EIP-6780): SELFDESTRUCT only works in same-tx creation, state persists
//!
//! We disable prewarming to ensure deterministic cache behavior and verify the execution
//! output state contains the expected account status after SELFDESTRUCT.
use crate::utils::{eth_payload_attributes, eth_payload_attributes_shanghai};
use alloy_network::{EthereumWallet, TransactionBuilder};
use alloy_primitives::{bytes, Address, Bytes, TxKind, U256};
use alloy_provider::{Provider, ProviderBuilder};
use alloy_rpc_types_eth::TransactionRequest;
use futures::StreamExt;
use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
use reth_e2e_test_utils::setup_engine;
use reth_node_api::TreeConfig;
use reth_node_ethereum::EthereumNode;
use reth_revm::db::BundleAccount;
use std::sync::Arc;
const MAX_FEE_PER_GAS: u128 = 20_000_000_000;
const MAX_PRIORITY_FEE_PER_GAS: u128 = 1_000_000_000;
fn cancun_spec() -> Arc<ChainSpec> {
Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(serde_json::from_str(include_str!("../assets/genesis.json")).unwrap())
.cancun_activated()
.build(),
)
}
fn shanghai_spec() -> Arc<ChainSpec> {
Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(serde_json::from_str(include_str!("../assets/genesis.json")).unwrap())
.shanghai_activated()
.build(),
)
}
fn deploy_tx(from: Address, nonce: u64, init_code: Bytes) -> TransactionRequest {
TransactionRequest::default()
.with_from(from)
.with_nonce(nonce)
.with_gas_limit(500_000)
.with_max_fee_per_gas(MAX_FEE_PER_GAS)
.with_max_priority_fee_per_gas(MAX_PRIORITY_FEE_PER_GAS)
.with_input(init_code)
.with_kind(TxKind::Create)
}
fn call_tx(from: Address, to: Address, nonce: u64) -> TransactionRequest {
TransactionRequest::default()
.with_from(from)
.with_to(to)
.with_nonce(nonce)
.with_gas_limit(100_000)
.with_max_fee_per_gas(MAX_FEE_PER_GAS)
.with_max_priority_fee_per_gas(MAX_PRIORITY_FEE_PER_GAS)
}
fn transfer_tx(from: Address, to: Address, nonce: u64, value: U256) -> TransactionRequest {
TransactionRequest::default()
.with_from(from)
.with_to(to)
.with_nonce(nonce)
.with_value(value)
.with_gas_limit(21_000)
.with_max_fee_per_gas(MAX_FEE_PER_GAS)
.with_max_priority_fee_per_gas(MAX_PRIORITY_FEE_PER_GAS)
}
/// Creates init code for a contract that selfdestructs during deployment (same tx).
/// This tests the EIP-6780 exception where SELFDESTRUCT in same tx as creation still works.
///
/// The contract:
/// 1. Stores 0x42 at slot 0
/// 2. Immediately selfdestructs to beneficiary (during init, before returning runtime)
fn selfdestruct_in_constructor_init_code() -> Bytes {
// Init code that selfdestructs during deployment:
// PUSH1 0x42, PUSH1 0x00, SSTORE (store 0x42 at slot 0)
// PUSH20 <beneficiary>, SELFDESTRUCT
let mut init = Vec::new();
init.extend_from_slice(&[0x60, 0x42, 0x60, 0x00, 0x55]); // PUSH1 0x42, PUSH1 0x00, SSTORE
init.extend_from_slice(&[
0x73, // PUSH20
0xde, 0xad, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x01, // beneficiary address
]);
init.push(0xff); // SELFDESTRUCT
Bytes::from(init)
}
/// Creates init code for a simple contract that:
/// 1. Stores 0x42 at slot 0 during deployment
/// 2. On any call: selfdestructs to beneficiary
///
/// This simpler contract avoids complex branching logic.
fn selfdestruct_contract_init_code() -> Bytes {
// Runtime: just selfdestruct on any call
// PUSH20 <beneficiary>
// SELFDESTRUCT
let runtime = bytes!(
"73dead000000000000000000000000000000000001" // PUSH20 beneficiary
"ff" // SELFDESTRUCT
);
let runtime_len = runtime.len(); // 22 bytes
// Init code: SSTORE(0, 0x42), CODECOPY, RETURN
// Total init code before runtime = 17 bytes
let init_len: u8 = 17;
let mut init = Vec::new();
init.extend_from_slice(&[0x60, 0x42, 0x60, 0x00, 0x55]); // PUSH1 0x42, PUSH1 0x00, SSTORE
init.extend_from_slice(&[0x60, runtime_len as u8, 0x60, init_len, 0x60, 0x00, 0x39]); // CODECOPY
init.extend_from_slice(&[0x60, runtime_len as u8, 0x60, 0x00, 0xf3]); // RETURN
init.extend_from_slice(&runtime);
Bytes::from(init)
}
/// Tests SELFDESTRUCT behavior post-Dencun (Cancun+).
///
/// Post-Dencun (EIP-6780):
/// - SELFDESTRUCT only deletes contract if called in same tx as creation
/// - For existing contracts, SELFDESTRUCT only sends balance, code/storage persist
/// - The output state should NOT mark the account as destroyed
///
/// This test verifies:
/// 1. Contract deploys with storage
/// 2. SELFDESTRUCT in later tx does NOT delete code/storage
/// 3. Output state shows account is NOT destroyed
#[tokio::test]
async fn test_selfdestruct_post_dencun() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tree_config = TreeConfig::default().without_prewarming(true).without_state_cache(false);
let (mut nodes, _tasks, wallet) =
setup_engine::<EthereumNode>(1, cancun_spec(), false, tree_config, eth_payload_attributes)
.await?;
let mut node = nodes.pop().unwrap();
let signer = wallet.inner.clone();
let provider = ProviderBuilder::new()
.wallet(EthereumWallet::new(signer.clone()))
.connect_http(node.rpc_url());
// Deploy contract that stores 0x42 at slot 0 and selfdestructs on any call
let pending = provider
.send_transaction(deploy_tx(signer.address(), 0, selfdestruct_contract_init_code()))
.await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "Contract deployment should succeed");
let contract_address = receipt.contract_address.expect("Should have contract address");
// Consume the canonical notification for deployment block
let _ = node.canonical_stream.next().await;
// Trigger SELFDESTRUCT by calling the contract
let pending = provider.send_transaction(call_tx(signer.address(), contract_address, 1)).await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "Selfdestruct tx should succeed");
// Get the canonical notification for the selfdestruct block
let notification = node.canonical_stream.next().await.unwrap();
let chain = notification.committed();
let execution_outcome = chain.execution_outcome();
// Verify the output state: post-Dencun, account should NOT be destroyed
let account_state: Option<&BundleAccount> = execution_outcome.bundle.account(&contract_address);
assert!(
account_state.is_none() || !account_state.unwrap().was_destroyed(),
"Post-Dencun (EIP-6780): Account should NOT be destroyed when SELFDESTRUCT called on existing contract"
);
// Verify via RPC that code and storage persist
let code_after = provider.get_code_at(contract_address).await?;
assert!(!code_after.is_empty(), "Post-Dencun: Contract code should persist");
let slot0_after = provider.get_storage_at(contract_address, U256::ZERO).await?;
assert_eq!(slot0_after, U256::from(0x42), "Post-Dencun: Storage should persist");
// Send another transaction to the contract address in a new block.
// This tests cache behavior - if cache has stale data, execution would be incorrect.
// Post-Dencun: calling the contract should trigger SELFDESTRUCT again (but only transfer
// balance)
let pending = provider.send_transaction(call_tx(signer.address(), contract_address, 2)).await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "Second call to contract should succeed");
// Consume the canonical notification
let notification = node.canonical_stream.next().await.unwrap();
let chain = notification.committed();
let execution_outcome = chain.execution_outcome();
// Verify the output state still shows account NOT destroyed
let account_state: Option<&BundleAccount> = execution_outcome.bundle.account(&contract_address);
assert!(
account_state.is_none() || !account_state.unwrap().was_destroyed(),
"Post-Dencun: Account should still NOT be destroyed after second SELFDESTRUCT call"
);
// Verify code and storage still persist after the second call
let code_final = provider.get_code_at(contract_address).await?;
assert!(!code_final.is_empty(), "Post-Dencun: Contract code should still persist");
let slot0_final = provider.get_storage_at(contract_address, U256::ZERO).await?;
assert_eq!(slot0_final, U256::from(0x42), "Post-Dencun: Storage should still persist");
Ok(())
}
/// Tests SELFDESTRUCT in same transaction as creation (post-Dencun).
///
/// Post-Dencun (EIP-6780):
/// - SELFDESTRUCT during the same transaction as creation DOES delete the contract
/// - This is the exception to the rule that SELFDESTRUCT no longer deletes contracts
///
/// This test verifies:
/// 1. Contract selfdestructs during its constructor
/// 2. Contract is deleted (same-tx exception applies)
/// 3. No code or storage remains
/// 4. Since account never existed in DB before, bundle has no entry for it
#[tokio::test]
async fn test_selfdestruct_same_tx_post_dencun() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tree_config = TreeConfig::default().without_prewarming(true).without_state_cache(false);
let (mut nodes, _tasks, wallet) =
setup_engine::<EthereumNode>(1, cancun_spec(), false, tree_config, eth_payload_attributes)
.await?;
let mut node = nodes.pop().unwrap();
let signer = wallet.inner.clone();
let provider = ProviderBuilder::new()
.wallet(EthereumWallet::new(signer.clone()))
.connect_http(node.rpc_url());
// Deploy contract that selfdestructs during its constructor
let pending = provider
.send_transaction(deploy_tx(signer.address(), 0, selfdestruct_in_constructor_init_code()))
.await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "Contract deployment with selfdestruct should succeed");
// Calculate the contract address (CREATE uses sender + nonce)
let contract_address = signer.address().create(0);
// Get the canonical notification for the deployment block
let notification = node.canonical_stream.next().await.unwrap();
let chain = notification.committed();
let execution_outcome = chain.execution_outcome();
// Verify the output state: same-tx SELFDESTRUCT should destroy the account
let account_state: Option<&BundleAccount> = execution_outcome.bundle.account(&contract_address);
assert!(
account_state.is_none(),
"Post-Dencun same-tx: Account was created and selfdestructed in the same transaction, no trace in bundle state"
);
// Verify via RPC that code and storage are cleared
let code = provider.get_code_at(contract_address).await?;
assert!(code.is_empty(), "Post-Dencun same-tx: Contract code should be deleted");
let slot0 = provider.get_storage_at(contract_address, U256::ZERO).await?;
assert_eq!(slot0, U256::ZERO, "Post-Dencun same-tx: Storage should be cleared");
// Send ETH to the destroyed address in a new block to test cache behavior
let pending = provider
.send_transaction(transfer_tx(signer.address(), contract_address, 1, U256::from(1000)))
.await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "ETH transfer to destroyed address should succeed");
// Consume the canonical notification
let _ = node.canonical_stream.next().await;
// Verify code is still empty and account received ETH
let code_final = provider.get_code_at(contract_address).await?;
assert!(code_final.is_empty(), "Post-Dencun same-tx: Contract code should remain deleted");
let balance = provider.get_balance(contract_address).await?;
assert_eq!(balance, U256::from(1000), "Post-Dencun same-tx: Account should have received ETH");
Ok(())
}
/// Tests SELFDESTRUCT behavior pre-Dencun (Shanghai).
///
/// Pre-Dencun:
/// - SELFDESTRUCT deletes contract code and storage regardless of when contract was created
/// - The output state MUST mark the account as destroyed
///
/// This test verifies:
/// 1. Contract deploys with storage
/// 2. SELFDESTRUCT deletes code and storage
/// 3. Output state shows account IS destroyed
#[tokio::test]
async fn test_selfdestruct_pre_dencun() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tree_config = TreeConfig::default().without_prewarming(true).without_state_cache(false);
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
1,
shanghai_spec(),
false,
tree_config,
eth_payload_attributes_shanghai,
)
.await?;
let mut node = nodes.pop().unwrap();
let signer = wallet.inner.clone();
let provider = ProviderBuilder::new()
.wallet(EthereumWallet::new(signer.clone()))
.connect_http(node.rpc_url());
// Deploy contract that stores 0x42 at slot 0 and selfdestructs on any call
let pending = provider
.send_transaction(deploy_tx(signer.address(), 0, selfdestruct_contract_init_code()))
.await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "Contract deployment should succeed");
let contract_address = receipt.contract_address.expect("Should have contract address");
// Consume the canonical notification for deployment block
let _ = node.canonical_stream.next().await;
// Trigger SELFDESTRUCT by calling the contract
let pending = provider.send_transaction(call_tx(signer.address(), contract_address, 1)).await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "Selfdestruct tx should succeed");
// Get the canonical notification for the selfdestruct block
let notification = node.canonical_stream.next().await.unwrap();
let chain = notification.committed();
let execution_outcome = chain.execution_outcome();
// Verify the output state: pre-Dencun, account MUST be destroyed
let account_state: Option<&BundleAccount> = execution_outcome.bundle.account(&contract_address);
assert!(
account_state.is_some_and(|a: &BundleAccount| a.was_destroyed()),
"Pre-Dencun: Account MUST be marked as destroyed in output state"
);
// Verify via RPC that code and storage are cleared
let code_after = provider.get_code_at(contract_address).await?;
assert!(code_after.is_empty(), "Pre-Dencun: Contract code should be deleted");
let slot0_after = provider.get_storage_at(contract_address, U256::ZERO).await?;
assert_eq!(slot0_after, U256::ZERO, "Pre-Dencun: Storage should be cleared");
// Send ETH to the destroyed contract address in a new block.
// This tests cache behavior - the cache should correctly reflect the account was destroyed.
// Pre-Dencun: the contract no longer exists, so this is just a plain ETH transfer.
let pending = provider
.send_transaction(transfer_tx(signer.address(), contract_address, 2, U256::from(1000)))
.await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "ETH transfer to destroyed contract address should succeed");
// Consume the canonical notification
let notification = node.canonical_stream.next().await.unwrap();
let chain = notification.committed();
let execution_outcome = chain.execution_outcome();
// Verify the output state shows the account exists (received ETH) but has no code
let account_state: Option<&BundleAccount> = execution_outcome.bundle.account(&contract_address);
// After receiving ETH, the account should exist with balance but no code
assert!(
account_state.is_some(),
"Pre-Dencun: Account should exist after receiving ETH (even though contract was destroyed)"
);
// Verify code is still empty (contract was destroyed, only ETH was received)
let code_final = provider.get_code_at(contract_address).await?;
assert!(code_final.is_empty(), "Pre-Dencun: Contract code should remain deleted");
// Verify storage is still cleared
let slot0_final = provider.get_storage_at(contract_address, U256::ZERO).await?;
assert_eq!(slot0_final, U256::ZERO, "Pre-Dencun: Storage should remain cleared");
// Verify the account now has the ETH balance we sent
let balance = provider.get_balance(contract_address).await?;
assert_eq!(balance, U256::from(1000), "Pre-Dencun: Account should have received ETH");
Ok(())
}
/// Tests SELFDESTRUCT in same transaction as creation, where account previously had ETH
/// (post-Dencun).
///
/// Post-Dencun (EIP-6780):
/// - The same-tx exception applies when the CONTRACT is created in that transaction
/// - Even if the address previously had ETH (as an EOA), deploying a contract there and
/// selfdestructing in the same tx DOES delete the contract
/// - The "created in same tx" refers to contract creation, not account existence
///
/// This test verifies:
/// 1. Send ETH to the future contract address (address has balance but no code)
/// 2. Deploy contract that selfdestructs during constructor to that address
/// 3. Contract is deleted (same-tx exception applies - contract was created this tx)
/// 4. Code and storage are cleared
/// 5. Since account existed in DB before (had ETH), bundle marks it as Destroyed
#[tokio::test]
async fn test_selfdestruct_same_tx_preexisting_account_post_dencun() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tree_config = TreeConfig::default().without_prewarming(true).without_state_cache(false);
let (mut nodes, _tasks, wallet) =
setup_engine::<EthereumNode>(1, cancun_spec(), false, tree_config, eth_payload_attributes)
.await?;
let mut node = nodes.pop().unwrap();
let signer = wallet.inner.clone();
let provider = ProviderBuilder::new()
.wallet(EthereumWallet::new(signer.clone()))
.connect_http(node.rpc_url());
// Calculate where the contract will be deployed (CREATE uses sender + nonce)
// We'll use nonce 1 for deployment, so first send ETH with nonce 0
let future_contract_address = signer.address().create(1);
// Send ETH to the future contract address first (makes it a pre-existing account)
let pending = provider
.send_transaction(transfer_tx(
signer.address(),
future_contract_address,
0,
U256::from(1000),
))
.await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "ETH transfer should succeed");
// Consume the canonical notification
let _ = node.canonical_stream.next().await;
// Verify the account exists and has balance
let balance_before = provider.get_balance(future_contract_address).await?;
assert_eq!(balance_before, U256::from(1000), "Account should have ETH before deployment");
// Now deploy contract that selfdestructs during its constructor to the same address
let pending = provider
.send_transaction(deploy_tx(signer.address(), 1, selfdestruct_in_constructor_init_code()))
.await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "Contract deployment with selfdestruct should succeed");
// Verify deployment went to the expected address
assert_eq!(
receipt.contract_address,
Some(future_contract_address),
"Contract should be deployed to pre-computed address"
);
// Get the canonical notification for the deployment block
let notification = node.canonical_stream.next().await.unwrap();
let chain = notification.committed();
let execution_outcome = chain.execution_outcome();
// Verify the output state: same-tx exception DOES apply because contract was created this tx
// The account should be marked as destroyed. Since it had prior state (ETH balance),
// the bundle will contain it with status Destroyed and original_info set.
let account_state: Option<&BundleAccount> =
execution_outcome.bundle.account(&future_contract_address);
assert!(
account_state.is_some_and(|a| a.was_destroyed()),
"Post-Dencun same-tx with prior ETH: Account MUST be marked as destroyed"
);
// Verify via RPC that code and storage are cleared
let code = provider.get_code_at(future_contract_address).await?;
assert!(code.is_empty(), "Post-Dencun same-tx: Contract code should be deleted");
let slot0 = provider.get_storage_at(future_contract_address, U256::ZERO).await?;
assert_eq!(slot0, U256::ZERO, "Post-Dencun same-tx: Storage should be cleared");
// Balance should be zero (sent to beneficiary during SELFDESTRUCT)
let balance_after = provider.get_balance(future_contract_address).await?;
assert_eq!(
balance_after,
U256::ZERO,
"Post-Dencun same-tx: Balance should be zero (sent to beneficiary)"
);
// Send ETH to the destroyed address to verify cache behavior
let pending = provider
.send_transaction(transfer_tx(
signer.address(),
future_contract_address,
2,
U256::from(2000),
))
.await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "ETH transfer should succeed");
// Consume notification
let _ = node.canonical_stream.next().await;
// Verify the account received ETH and has no code (it's now just an EOA)
let balance_final = provider.get_balance(future_contract_address).await?;
assert_eq!(balance_final, U256::from(2000), "Account should have received ETH");
let code_final = provider.get_code_at(future_contract_address).await?;
assert!(code_final.is_empty(), "Code should remain empty after ETH transfer");
let slot0_final = provider.get_storage_at(future_contract_address, U256::ZERO).await?;
assert_eq!(slot0_final, U256::ZERO, "Storage should remain cleared");
Ok(())
}

View File

@@ -29,19 +29,6 @@ pub(crate) fn eth_payload_attributes(timestamp: u64) -> EthPayloadBuilderAttribu
EthPayloadBuilderAttributes::new(B256::ZERO, attributes)
}
/// Helper function to create pre-Cancun (Shanghai) payload attributes.
/// No `parent_beacon_block_root` field.
pub(crate) fn eth_payload_attributes_shanghai(timestamp: u64) -> EthPayloadBuilderAttributes {
let attributes = PayloadAttributes {
timestamp,
prev_randao: B256::ZERO,
suggested_fee_recipient: Address::ZERO,
withdrawals: Some(vec![]),
parent_beacon_block_root: None,
};
EthPayloadBuilderAttributes::new(B256::ZERO, attributes)
}
/// Advances node by producing blocks with random transactions.
pub(crate) async fn advance_with_random_transactions<Provider>(
node: &mut NodeHelperType<EthereumNode, Provider>,

View File

@@ -75,11 +75,9 @@ pub trait Executor<DB: Database>: Sized {
where
I: IntoIterator<Item = &'a RecoveredBlock<<Self::Primitives as NodePrimitives>::Block>>,
{
let blocks_iter = blocks.into_iter();
let capacity = blocks_iter.size_hint().0;
let mut results = Vec::with_capacity(capacity);
let mut results = Vec::new();
let mut first_block = None;
for block in blocks_iter {
for block in blocks {
if first_block.is_none() {
first_block = Some(block.header().number());
}

View File

@@ -35,7 +35,7 @@ use reth_execution_errors::BlockExecutionError;
use reth_primitives_traits::{
BlockTy, HeaderTy, NodePrimitives, ReceiptTy, SealedBlock, SealedHeader, TxTy,
};
use revm::{context::TxEnv, database::State, primitives::hardfork::SpecId};
use revm::{context::TxEnv, database::State};
pub mod either;
/// EVM environment configuration.
@@ -203,7 +203,6 @@ pub trait ConfigureEvm: Clone + Debug + Send + Sync + Unpin {
+ FromRecoveredTx<TxTy<Self::Primitives>>
+ FromTxWithEncoded<TxTy<Self::Primitives>>,
Precompiles = PrecompilesMap,
Spec: Into<SpecId>,
>,
>;

View File

@@ -66,17 +66,13 @@ use tokio::sync::mpsc::{Sender, UnboundedReceiver};
#[non_exhaustive]
pub struct TestPoolBuilder;
impl<Node, Evm: Send> PoolBuilder<Node, Evm> for TestPoolBuilder
impl<Node> PoolBuilder<Node> for TestPoolBuilder
where
Node: FullNodeTypes<Types: NodeTypes<Primitives: NodePrimitives<SignedTx = TransactionSigned>>>,
{
type Pool = TestPool;
async fn build_pool(
self,
_ctx: &BuilderContext<Node>,
_evm_config: Evm,
) -> eyre::Result<Self::Pool> {
async fn build_pool(self, _ctx: &BuilderContext<Node>) -> eyre::Result<Self::Pool> {
Ok(testing_pool())
}
}

View File

@@ -1631,7 +1631,7 @@ impl Discv4Service {
.filter(|entry| entry.node.value.is_expired())
.map(|n| n.node.value)
.collect::<Vec<_>>();
nodes.sort_by_key(|a| a.last_seen);
nodes.sort_by(|a, b| a.last_seen.cmp(&b.last_seen));
let to_ping = nodes.into_iter().map(|n| n.record).take(MAX_NODES_PING).collect::<Vec<_>>();
for node in to_ping {
self.try_ping(node, PingReason::RePing)

View File

@@ -14,7 +14,6 @@ workspace = true
[dependencies]
# reth
reth-chainspec.workspace = true
reth-evm-ethereum = { workspace = true, optional = true }
reth-fs-util.workspace = true
reth-primitives-traits.workspace = true
reth-net-banlist.workspace = true
@@ -137,8 +136,6 @@ test-utils = [
"reth-primitives-traits/test-utils",
"reth-provider/test-utils",
"reth-ethereum-primitives/test-utils",
"dep:reth-evm-ethereum",
"reth-evm-ethereum?/test-utils",
]
[[bench]]

View File

@@ -19,7 +19,6 @@ use reth_eth_wire::{
protocol::Protocol, DisconnectReason, EthNetworkPrimitives, HelloMessageWithProtocols,
};
use reth_ethereum_primitives::{PooledTransactionVariant, TransactionSigned};
use reth_evm_ethereum::EthEvmConfig;
use reth_network_api::{
events::{PeerEvent, SessionInfo},
test_utils::{PeersHandle, PeersHandleProvider},
@@ -183,20 +182,17 @@ where
C: ChainSpecProvider<ChainSpec: EthereumHardforks>
+ StateProviderFactory
+ BlockReaderIdExt
+ HeaderProvider<Header = alloy_consensus::Header>
+ HeaderProvider
+ Clone
+ 'static,
Pool: TransactionPool,
{
/// Installs an eth pool on each peer
pub fn with_eth_pool(
self,
) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore, EthEvmConfig>> {
pub fn with_eth_pool(self) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore>> {
self.map_pool(|peer| {
let blob_store = InMemoryBlobStore::default();
let pool = TransactionValidationTaskExecutor::eth(
peer.client.clone(),
EthEvmConfig::mainnet(),
blob_store.clone(),
TokioTaskExecutor::default(),
);
@@ -212,7 +208,7 @@ where
pub fn with_eth_pool_config(
self,
tx_manager_config: TransactionsManagerConfig,
) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore, EthEvmConfig>> {
) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore>> {
self.with_eth_pool_config_and_policy(tx_manager_config, Default::default())
}
@@ -221,12 +217,11 @@ where
self,
tx_manager_config: TransactionsManagerConfig,
policy: TransactionPropagationKind,
) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore, EthEvmConfig>> {
) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore>> {
self.map_pool(|peer| {
let blob_store = InMemoryBlobStore::default();
let pool = TransactionValidationTaskExecutor::eth(
peer.client.clone(),
EthEvmConfig::mainnet(),
blob_store.clone(),
TokioTaskExecutor::default(),
);

View File

@@ -188,7 +188,13 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
let TxFetchMetadata { fallback_peers, .. } =
self.hashes_fetch_inflight_and_pending_fetch.peek(&hash)?;
fallback_peers.iter().find(|peer_id| self.is_idle(peer_id))
for peer_id in fallback_peers.iter() {
if self.is_idle(peer_id) {
return Some(peer_id)
}
}
None
}
/// Returns any idle peer for any hash pending fetch. If one is found, the corresponding

View File

@@ -20,7 +20,6 @@ use reth_network_p2p::{
};
use reth_network_peers::{mainnet_nodes, NodeRecord, TrustedPeer};
use reth_network_types::peers::config::PeerBackoffDurations;
use reth_provider::test_utils::MockEthProvider;
use reth_storage_api::noop::NoopProvider;
use reth_tracing::init_test_tracing;
use reth_transaction_pool::test_utils::testing_pool;
@@ -656,8 +655,7 @@ async fn new_random_peer(
async fn test_connect_many() {
reth_tracing::init_test_tracing();
let provider = MockEthProvider::default().with_genesis_block();
let net = Testnet::create_with(5, provider).await;
let net = Testnet::create_with(5, NoopProvider::default()).await;
// install request handlers
let net = net.with_eth_pool();

View File

@@ -22,7 +22,7 @@ use tokio::join;
async fn test_tx_gossip() {
reth_tracing::init_test_tracing();
let provider = MockEthProvider::default().with_genesis_block();
let provider = MockEthProvider::default();
let net = Testnet::create_with(2, provider.clone()).await;
// install request handlers
@@ -61,7 +61,7 @@ async fn test_tx_gossip() {
async fn test_tx_propagation_policy_trusted_only() {
reth_tracing::init_test_tracing();
let provider = MockEthProvider::default().with_genesis_block();
let provider = MockEthProvider::default();
let policy = TransactionPropagationKind::Trusted;
let net = Testnet::create_with(2, provider.clone()).await;
@@ -129,7 +129,7 @@ async fn test_tx_propagation_policy_trusted_only() {
async fn test_tx_ingress_policy_trusted_only() {
reth_tracing::init_test_tracing();
let provider = MockEthProvider::default().with_genesis_block();
let provider = MockEthProvider::default();
let tx_manager_config = TransactionsManagerConfig {
ingress_policy: TransactionIngressPolicy::Trusted,
@@ -195,7 +195,7 @@ async fn test_tx_ingress_policy_trusted_only() {
#[tokio::test(flavor = "multi_thread")]
async fn test_4844_tx_gossip_penalization() {
reth_tracing::init_test_tracing();
let provider = MockEthProvider::default().with_genesis_block();
let provider = MockEthProvider::default();
let net = Testnet::create_with(2, provider.clone()).await;
// install request handlers
@@ -246,7 +246,7 @@ async fn test_4844_tx_gossip_penalization() {
#[tokio::test(flavor = "multi_thread")]
async fn test_sending_invalid_transactions() {
reth_tracing::init_test_tracing();
let provider = MockEthProvider::default().with_genesis_block();
let provider = MockEthProvider::default();
let net = Testnet::create_with(2, provider.clone()).await;
// install request handlers
let net = net.with_eth_pool();

View File

@@ -571,8 +571,8 @@ where
debug!(target: "downloaders", %err, ?this.start_hash, "Body range download failed");
}
}
if this.request.bodies.is_none() && !this.is_bodies_complete() {
// no pending bodies request (e.g., request error), retry remaining bodies
if this.bodies.is_empty() {
// received bad response, re-request headers
// TODO: convert this into two futures, one which is a headers range
// future, and one which is a bodies range future.
//
@@ -751,12 +751,8 @@ mod tests {
use reth_ethereum_primitives::BlockBody;
use super::*;
use crate::{error::RequestError, test_utils::TestFullBlockClient};
use std::{
ops::Range,
sync::atomic::{AtomicUsize, Ordering},
};
use tokio::time::{timeout, Duration};
use crate::test_utils::TestFullBlockClient;
use std::ops::Range;
#[tokio::test]
async fn download_single_full_block() {
@@ -804,65 +800,6 @@ mod tests {
(sealed_header, body)
}
#[derive(Clone, Debug)]
struct FailingBodiesClient {
inner: TestFullBlockClient,
fail_on: usize,
body_requests: Arc<AtomicUsize>,
}
impl FailingBodiesClient {
fn new(inner: TestFullBlockClient, fail_on: usize) -> Self {
Self { inner, fail_on, body_requests: Arc::new(AtomicUsize::new(0)) }
}
}
impl DownloadClient for FailingBodiesClient {
fn report_bad_message(&self, peer_id: PeerId) {
self.inner.report_bad_message(peer_id);
}
fn num_connected_peers(&self) -> usize {
self.inner.num_connected_peers()
}
}
impl HeadersClient for FailingBodiesClient {
type Header = <TestFullBlockClient as HeadersClient>::Header;
type Output = <TestFullBlockClient as HeadersClient>::Output;
fn get_headers_with_priority(
&self,
request: HeadersRequest,
priority: Priority,
) -> Self::Output {
self.inner.get_headers_with_priority(request, priority)
}
}
impl BodiesClient for FailingBodiesClient {
type Body = <TestFullBlockClient as BodiesClient>::Body;
type Output = <TestFullBlockClient as BodiesClient>::Output;
fn get_block_bodies_with_priority_and_range_hint(
&self,
hashes: Vec<B256>,
priority: Priority,
range_hint: Option<RangeInclusive<u64>>,
) -> Self::Output {
let attempt = self.body_requests.fetch_add(1, Ordering::SeqCst);
if attempt == self.fail_on {
return futures::future::ready(Err(RequestError::Timeout))
}
self.inner.get_block_bodies_with_priority_and_range_hint(hashes, priority, range_hint)
}
}
impl BlockClient for FailingBodiesClient {
type Block = reth_ethereum_primitives::Block;
}
#[tokio::test]
async fn download_full_block_range() {
let client = TestFullBlockClient::default();
@@ -900,25 +837,6 @@ mod tests {
}
}
#[tokio::test]
async fn download_full_block_range_retries_after_body_error() {
let mut client = TestFullBlockClient::default();
client.set_soft_limit(2);
let (header, _) = insert_headers_into_client(&client, 0..3);
let client = FailingBodiesClient::new(client, 1);
let body_requests = Arc::clone(&client.body_requests);
let client = FullBlockClient::test_client(client);
let received =
timeout(Duration::from_secs(1), client.get_full_block_range(header.hash(), 3))
.await
.expect("body request retry should complete");
assert_eq!(received.len(), 3);
assert_eq!(body_requests.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn download_full_block_range_with_invalid_header() {
let client = TestFullBlockClient::default();

View File

@@ -62,12 +62,12 @@ impl<Node, PoolB, PayloadB, NetworkB, ExecB, ConsB>
pool_builder,
payload_builder,
network_builder,
executor_builder,
executor_builder: evm_builder,
consensus_builder,
_marker,
} = self;
ComponentsBuilder {
executor_builder,
executor_builder: evm_builder,
pool_builder,
payload_builder,
network_builder,
@@ -149,12 +149,15 @@ where
pub fn pool<PB>(
self,
pool_builder: PB,
) -> ComponentsBuilder<Node, PB, PayloadB, NetworkB, ExecB, ConsB> {
) -> ComponentsBuilder<Node, PB, PayloadB, NetworkB, ExecB, ConsB>
where
PB: PoolBuilder<Node>,
{
let Self {
pool_builder: _,
payload_builder,
network_builder,
executor_builder,
executor_builder: evm_builder,
consensus_builder,
_marker,
} = self;
@@ -162,7 +165,7 @@ where
pool_builder,
payload_builder,
network_builder,
executor_builder,
executor_builder: evm_builder,
consensus_builder,
_marker,
}
@@ -182,6 +185,72 @@ where
_marker: self._marker,
}
}
}
impl<Node, PoolB, PayloadB, NetworkB, ExecB, ConsB>
ComponentsBuilder<Node, PoolB, PayloadB, NetworkB, ExecB, ConsB>
where
Node: FullNodeTypes,
PoolB: PoolBuilder<Node>,
{
/// Configures the network builder.
///
/// This accepts a [`NetworkBuilder`] instance that will be used to create the node's network
/// stack.
pub fn network<NB>(
self,
network_builder: NB,
) -> ComponentsBuilder<Node, PoolB, PayloadB, NB, ExecB, ConsB>
where
NB: NetworkBuilder<Node, PoolB::Pool>,
{
let Self {
pool_builder,
payload_builder,
network_builder: _,
executor_builder: evm_builder,
consensus_builder,
_marker,
} = self;
ComponentsBuilder {
pool_builder,
payload_builder,
network_builder,
executor_builder: evm_builder,
consensus_builder,
_marker,
}
}
/// Configures the payload builder.
///
/// This accepts a [`PayloadServiceBuilder`] instance that will be used to create the node's
/// payload builder service.
pub fn payload<PB>(
self,
payload_builder: PB,
) -> ComponentsBuilder<Node, PoolB, PB, NetworkB, ExecB, ConsB>
where
ExecB: ExecutorBuilder<Node>,
PB: PayloadServiceBuilder<Node, PoolB::Pool, ExecB::EVM>,
{
let Self {
pool_builder,
payload_builder: _,
network_builder,
executor_builder: evm_builder,
consensus_builder,
_marker,
} = self;
ComponentsBuilder {
pool_builder,
payload_builder,
network_builder,
executor_builder: evm_builder,
consensus_builder,
_marker,
}
}
/// Configures the executor builder.
///
@@ -229,72 +298,7 @@ where
network_builder,
executor_builder,
consensus_builder: _,
_marker,
} = self;
ComponentsBuilder {
pool_builder,
payload_builder,
network_builder,
executor_builder,
consensus_builder,
_marker,
}
}
}
impl<Node, PoolB, PayloadB, NetworkB, ExecB, ConsB>
ComponentsBuilder<Node, PoolB, PayloadB, NetworkB, ExecB, ConsB>
where
Node: FullNodeTypes,
ExecB: ExecutorBuilder<Node>,
PoolB: PoolBuilder<Node, ExecB::EVM>,
{
/// Configures the network builder.
///
/// This accepts a [`NetworkBuilder`] instance that will be used to create the node's network
/// stack.
pub fn network<NB>(
self,
network_builder: NB,
) -> ComponentsBuilder<Node, PoolB, PayloadB, NB, ExecB, ConsB>
where
NB: NetworkBuilder<Node, PoolB::Pool>,
{
let Self {
pool_builder,
payload_builder,
network_builder: _,
executor_builder,
consensus_builder,
_marker,
} = self;
ComponentsBuilder {
pool_builder,
payload_builder,
network_builder,
executor_builder,
consensus_builder,
_marker,
}
}
/// Configures the payload builder.
///
/// This accepts a [`PayloadServiceBuilder`] instance that will be used to create the node's
/// payload builder service.
pub fn payload<PB>(
self,
payload_builder: PB,
) -> ComponentsBuilder<Node, PoolB, PB, NetworkB, ExecB, ConsB>
where
PB: PayloadServiceBuilder<Node, PoolB::Pool, ExecB::EVM>,
{
let Self {
pool_builder,
payload_builder: _,
network_builder,
executor_builder,
consensus_builder,
_marker,
} = self;
ComponentsBuilder {
@@ -354,7 +358,7 @@ impl<Node, PoolB, PayloadB, NetworkB, ExecB, ConsB> NodeComponentsBuilder<Node>
for ComponentsBuilder<Node, PoolB, PayloadB, NetworkB, ExecB, ConsB>
where
Node: FullNodeTypes,
PoolB: PoolBuilder<Node, ExecB::EVM, Pool: TransactionPool>,
PoolB: PoolBuilder<Node, Pool: TransactionPool>,
NetworkB: NetworkBuilder<
Node,
PoolB::Pool,
@@ -380,13 +384,13 @@ where
pool_builder,
payload_builder,
network_builder,
executor_builder,
executor_builder: evm_builder,
consensus_builder,
_marker,
} = self;
let evm_config = executor_builder.build_evm(context).await?;
let pool = pool_builder.build_pool(context, evm_config.clone()).await?;
let evm_config = evm_builder.build_evm(context).await?;
let pool = pool_builder.build_pool(context).await?;
let network = network_builder.build_network(context, pool.clone()).await?;
let payload_builder_handle = payload_builder
.spawn_payload_builder_service(context, pool.clone(), evm_config.clone())
@@ -467,19 +471,14 @@ where
#[derive(Debug, Clone)]
pub struct NoopTransactionPoolBuilder<Tx = EthPooledTransaction>(PhantomData<Tx>);
impl<N, Tx, Evm> PoolBuilder<N, Evm> for NoopTransactionPoolBuilder<Tx>
impl<N, Tx> PoolBuilder<N> for NoopTransactionPoolBuilder<Tx>
where
N: FullNodeTypes,
Tx: EthPoolTransaction<Consensus = TxTy<N::Types>> + Unpin,
Evm: Send,
{
type Pool = NoopTransactionPool<Tx>;
async fn build_pool(
self,
_ctx: &BuilderContext<N>,
_evm_config: Evm,
) -> eyre::Result<Self::Pool> {
async fn build_pool(self, _ctx: &BuilderContext<N>) -> eyre::Result<Self::Pool> {
Ok(NoopTransactionPool::<Tx>::new())
}
}

View File

@@ -4,7 +4,7 @@ use crate::{BuilderContext, FullNodeTypes};
use alloy_primitives::Address;
use reth_chain_state::CanonStateSubscriptions;
use reth_chainspec::EthereumHardforks;
use reth_node_api::{BlockTy, NodeTypes, TxTy};
use reth_node_api::{NodeTypes, TxTy};
use reth_transaction_pool::{
blobstore::DiskFileBlobStore, BlobStore, CoinbaseTipOrdering, PoolConfig, PoolTransaction,
SubPoolLimit, TransactionPool, TransactionValidationTaskExecutor, TransactionValidator,
@@ -12,7 +12,7 @@ use reth_transaction_pool::{
use std::{collections::HashSet, future::Future};
/// A type that knows how to build the transaction pool.
pub trait PoolBuilder<Node: FullNodeTypes, Evm>: Send {
pub trait PoolBuilder<Node: FullNodeTypes>: Send {
/// The transaction pool to build.
type Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>>
+ Unpin
@@ -22,17 +22,16 @@ pub trait PoolBuilder<Node: FullNodeTypes, Evm>: Send {
fn build_pool(
self,
ctx: &BuilderContext<Node>,
evm_config: Evm,
) -> impl Future<Output = eyre::Result<Self::Pool>> + Send;
}
impl<Node, F, Fut, Pool, Evm> PoolBuilder<Node, Evm> for F
impl<Node, F, Fut, Pool> PoolBuilder<Node> for F
where
Node: FullNodeTypes,
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>>
+ Unpin
+ 'static,
F: FnOnce(&BuilderContext<Node>, Evm) -> Fut + Send,
F: FnOnce(&BuilderContext<Node>) -> Fut + Send,
Fut: Future<Output = eyre::Result<Pool>> + Send,
{
type Pool = Pool;
@@ -40,9 +39,8 @@ where
fn build_pool(
self,
ctx: &BuilderContext<Node>,
evm_config: Evm,
) -> impl Future<Output = eyre::Result<Self::Pool>> {
self(ctx, evm_config)
self(ctx)
}
}
@@ -131,7 +129,7 @@ impl<'a, Node: FullNodeTypes, V> TxPoolBuilder<'a, Node, V> {
impl<'a, Node, V> TxPoolBuilder<'a, Node, TransactionValidationTaskExecutor<V>>
where
Node: FullNodeTypes<Types: NodeTypes<ChainSpec: EthereumHardforks>>,
V: TransactionValidator<Block = BlockTy<Node::Types>> + 'static,
V: TransactionValidator + 'static,
V::Transaction:
PoolTransaction<Consensus = TxTy<Node::Types>> + reth_transaction_pool::EthPoolTransaction,
{
@@ -250,7 +248,7 @@ fn spawn_pool_maintenance_task<Node, Pool>(
) -> eyre::Result<()>
where
Node: FullNodeTypes<Types: NodeTypes<ChainSpec: EthereumHardforks>>,
Pool: reth_transaction_pool::TransactionPoolExt<Block = BlockTy<Node::Types>> + Clone + 'static,
Pool: reth_transaction_pool::TransactionPoolExt + Clone + 'static,
Pool::Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>,
{
let chain_events = ctx.provider().canonical_state_stream();
@@ -282,7 +280,7 @@ pub fn spawn_maintenance_tasks<Node, Pool>(
) -> eyre::Result<()>
where
Node: FullNodeTypes<Types: NodeTypes<ChainSpec: EthereumHardforks>>,
Pool: reth_transaction_pool::TransactionPoolExt<Block = BlockTy<Node::Types>> + Clone + 'static,
Pool: reth_transaction_pool::TransactionPoolExt + Clone + 'static,
Pool::Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>,
{
spawn_local_backup_task(ctx, pool.clone())?;

View File

@@ -22,8 +22,9 @@ pub struct DefaultEngineValues {
legacy_state_root_task_enabled: bool,
state_cache_disabled: bool,
prewarming_disabled: bool,
parallel_sparse_trie_disabled: bool,
state_provider_metrics: bool,
cross_block_cache_size: usize,
cross_block_cache_size: u64,
state_root_task_compare_updates: bool,
accept_execution_requests_hash: bool,
multiproof_chunking_enabled: bool,
@@ -80,6 +81,12 @@ impl DefaultEngineValues {
self
}
/// Set whether to disable parallel sparse trie by default
pub const fn with_parallel_sparse_trie_disabled(mut self, v: bool) -> Self {
self.parallel_sparse_trie_disabled = v;
self
}
/// Set whether to enable state provider metrics by default
pub const fn with_state_provider_metrics(mut self, v: bool) -> Self {
self.state_provider_metrics = v;
@@ -87,7 +94,7 @@ impl DefaultEngineValues {
}
/// Set the default cross-block cache size in MB
pub const fn with_cross_block_cache_size(mut self, v: usize) -> Self {
pub const fn with_cross_block_cache_size(mut self, v: u64) -> Self {
self.cross_block_cache_size = v;
self
}
@@ -182,6 +189,7 @@ impl Default for DefaultEngineValues {
legacy_state_root_task_enabled: false,
state_cache_disabled: false,
prewarming_disabled: false,
parallel_sparse_trie_disabled: false,
state_provider_metrics: false,
cross_block_cache_size: DEFAULT_CROSS_BLOCK_CACHE_SIZE_MB,
state_root_task_compare_updates: false,
@@ -236,14 +244,14 @@ pub struct EngineArgs {
#[arg(long = "engine.disable-prewarming", alias = "engine.disable-caching-and-prewarming", default_value_t = DefaultEngineValues::get_global().prewarming_disabled)]
pub prewarming_disabled: bool,
/// CAUTION: This CLI flag has no effect anymore. The parallel sparse trie is always enabled.
/// CAUTION: This CLI flag has no effect anymore, use --engine.disable-parallel-sparse-trie
/// if you want to disable usage of the `ParallelSparseTrie`.
#[deprecated]
#[arg(long = "engine.parallel-sparse-trie", default_value = "true", hide = true)]
pub parallel_sparse_trie_enabled: bool,
/// CAUTION: This CLI flag has no effect anymore. The parallel sparse trie is always enabled.
#[deprecated]
#[arg(long = "engine.disable-parallel-sparse-trie", default_value = "false", hide = true)]
/// Disable the parallel sparse trie in the engine.
#[arg(long = "engine.disable-parallel-sparse-trie", default_value_t = DefaultEngineValues::get_global().parallel_sparse_trie_disabled)]
pub parallel_sparse_trie_disabled: bool,
/// Enable state provider latency metrics. This allows the engine to collect and report stats
@@ -254,7 +262,7 @@ pub struct EngineArgs {
/// Configure the size of cross-block cache in megabytes
#[arg(long = "engine.cross-block-cache-size", default_value_t = DefaultEngineValues::get_global().cross_block_cache_size)]
pub cross_block_cache_size: usize,
pub cross_block_cache_size: u64,
/// Enable comparing trie updates from the state root task to the trie updates from the regular
/// state root calculation.
@@ -335,6 +343,7 @@ impl Default for EngineArgs {
legacy_state_root_task_enabled,
state_cache_disabled,
prewarming_disabled,
parallel_sparse_trie_disabled,
state_provider_metrics,
cross_block_cache_size,
state_root_task_compare_updates,
@@ -360,7 +369,7 @@ impl Default for EngineArgs {
state_cache_disabled,
prewarming_disabled,
parallel_sparse_trie_enabled: true,
parallel_sparse_trie_disabled: false,
parallel_sparse_trie_disabled,
state_provider_metrics,
cross_block_cache_size,
accept_execution_requests_hash,
@@ -389,6 +398,7 @@ impl EngineArgs {
.with_legacy_state_root(self.legacy_state_root_task_enabled)
.without_state_cache(self.state_cache_disabled)
.without_prewarming(self.prewarming_disabled)
.with_disable_parallel_sparse_trie(self.parallel_sparse_trie_disabled)
.with_state_provider_metrics(self.state_provider_metrics)
.with_always_compare_trie_updates(self.state_root_task_compare_updates)
.with_cross_block_cache_size(self.cross_block_cache_size * 1024 * 1024)
@@ -447,7 +457,7 @@ mod tests {
state_cache_disabled: true,
prewarming_disabled: true,
parallel_sparse_trie_enabled: true,
parallel_sparse_trie_disabled: false,
parallel_sparse_trie_disabled: true,
state_provider_metrics: true,
cross_block_cache_size: 256,
state_root_task_compare_updates: true,
@@ -475,6 +485,7 @@ mod tests {
"--engine.legacy-state-root",
"--engine.disable-state-cache",
"--engine.disable-prewarming",
"--engine.disable-parallel-sparse-trie",
"--engine.state-provider-metrics",
"--engine.cross-block-cache-size",
"256",

View File

@@ -1025,7 +1025,6 @@ mod tests {
max_receipts: 2000,
max_headers: 1000,
max_concurrent_db_requests: 512,
max_cached_tx_hashes: 30_000,
},
gas_price_oracle: GasPriceOracleArgs {
blocks: 20,

View File

@@ -1,7 +1,7 @@
use clap::Args;
use reth_rpc_server_types::constants::cache::{
DEFAULT_BLOCK_CACHE_MAX_LEN, DEFAULT_CONCURRENT_DB_REQUESTS, DEFAULT_HEADER_CACHE_MAX_LEN,
DEFAULT_MAX_CACHED_TX_HASHES, DEFAULT_RECEIPT_CACHE_MAX_LEN,
DEFAULT_RECEIPT_CACHE_MAX_LEN,
};
/// Parameters to configure RPC state cache.
@@ -36,13 +36,6 @@ pub struct RpcStateCacheArgs {
default_value_t = DEFAULT_CONCURRENT_DB_REQUESTS,
)]
pub max_concurrent_db_requests: usize,
/// Maximum number of transaction hashes to cache for transaction lookups.
#[arg(
long = "rpc-cache.max-cached-tx-hashes",
default_value_t = DEFAULT_MAX_CACHED_TX_HASHES,
)]
pub max_cached_tx_hashes: u32,
}
impl RpcStateCacheArgs {
@@ -61,7 +54,6 @@ impl Default for RpcStateCacheArgs {
max_receipts: DEFAULT_RECEIPT_CACHE_MAX_LEN,
max_headers: DEFAULT_HEADER_CACHE_MAX_LEN,
max_concurrent_db_requests: DEFAULT_CONCURRENT_DB_REQUESTS,
max_cached_tx_hashes: DEFAULT_MAX_CACHED_TX_HASHES,
}
}
}

View File

@@ -39,7 +39,7 @@ pub use reth_engine_primitives::{
};
/// Default size of cross-block cache in megabytes.
pub const DEFAULT_CROSS_BLOCK_CACHE_SIZE_MB: usize = 4 * 1024;
pub const DEFAULT_CROSS_BLOCK_CACHE_SIZE_MB: u64 = 4 * 1024;
/// This includes all necessary configuration to launch the node.
/// The individual configuration options can be overwritten before launching the node.

View File

@@ -257,11 +257,6 @@ fn describe_rocksdb_metrics() {
Unit::Bytes,
"The size of memtables for a RocksDB table"
);
describe_gauge!(
"rocksdb.wal_size",
Unit::Bytes,
"The total size of WAL (Write-Ahead Log) files. Important: this is not included in table_size or sst_size metrics"
);
}
#[cfg(all(feature = "jemalloc", unix))]

View File

@@ -165,7 +165,6 @@ impl OpNode {
self.args;
ComponentsBuilder::default()
.node_types::<Node>()
.executor(OpExecutorBuilder::default())
.pool(
OpPoolBuilder::default()
.with_enable_tx_conditional(self.args.enable_tx_conditional)
@@ -174,6 +173,7 @@ impl OpNode {
self.args.supervisor_safety_level,
),
)
.executor(OpExecutorBuilder::default())
.payload(BasicPayloadServiceBuilder::new(
OpPayloadBuilder::new(compute_pending_block)
.with_da_config(self.da_config.clone())
@@ -957,19 +957,14 @@ impl<T> OpPoolBuilder<T> {
}
}
impl<Node, T, Evm> PoolBuilder<Node, Evm> for OpPoolBuilder<T>
impl<Node, T> PoolBuilder<Node> for OpPoolBuilder<T>
where
Node: FullNodeTypes<Types: NodeTypes<ChainSpec: OpHardforks>>,
T: EthPoolTransaction<Consensus = TxTy<Node::Types>> + OpPooledTx,
Evm: ConfigureEvm<Primitives = PrimitivesTy<Node::Types>> + Clone + 'static,
{
type Pool = OpTransactionPool<Node::Provider, DiskFileBlobStore, Evm, T>;
type Pool = OpTransactionPool<Node::Provider, DiskFileBlobStore, T>;
async fn build_pool(
self,
ctx: &BuilderContext<Node>,
evm_config: Evm,
) -> eyre::Result<Self::Pool> {
async fn build_pool(self, ctx: &BuilderContext<Node>) -> eyre::Result<Self::Pool> {
let Self { pool_config_overrides, .. } = self;
// supervisor used for interop
@@ -987,27 +982,27 @@ where
.await;
let blob_store = reth_node_builder::components::create_blob_store(ctx)?;
let validator =
TransactionValidationTaskExecutor::eth_builder(ctx.provider().clone(), evm_config)
.no_eip4844()
.with_max_tx_input_bytes(ctx.config().txpool.max_tx_input_bytes)
.kzg_settings(ctx.kzg_settings()?)
.set_tx_fee_cap(ctx.config().rpc.rpc_tx_fee_cap)
.with_max_tx_gas_limit(ctx.config().txpool.max_tx_gas_limit)
.with_minimum_priority_fee(ctx.config().txpool.minimum_priority_fee)
.with_additional_tasks(
pool_config_overrides
.additional_validation_tasks
.unwrap_or_else(|| ctx.config().txpool.additional_validation_tasks),
)
.build_with_tasks(ctx.task_executor().clone(), blob_store.clone())
.map(|validator| {
OpTransactionValidator::new(validator)
// In --dev mode we can't require gas fees because we're unable to decode
// the L1 block info
.require_l1_data_gas_fee(!ctx.config().dev.dev)
.with_supervisor(supervisor_client.clone())
});
let validator = TransactionValidationTaskExecutor::eth_builder(ctx.provider().clone())
.no_eip4844()
.with_head_timestamp(ctx.head().timestamp)
.with_max_tx_input_bytes(ctx.config().txpool.max_tx_input_bytes)
.kzg_settings(ctx.kzg_settings()?)
.set_tx_fee_cap(ctx.config().rpc.rpc_tx_fee_cap)
.with_max_tx_gas_limit(ctx.config().txpool.max_tx_gas_limit)
.with_minimum_priority_fee(ctx.config().txpool.minimum_priority_fee)
.with_additional_tasks(
pool_config_overrides
.additional_validation_tasks
.unwrap_or_else(|| ctx.config().txpool.additional_validation_tasks),
)
.build_with_tasks(ctx.task_executor().clone(), blob_store.clone())
.map(|validator| {
OpTransactionValidator::new(validator)
// In --dev mode we can't require gas fees because we're unable to decode
// the L1 block info
.require_l1_data_gas_fee(!ctx.config().dev.dev)
.with_supervisor(supervisor_client.clone())
});
let final_pool_config = pool_config_overrides.apply(ctx.pool_config());

View File

@@ -52,9 +52,9 @@
//! ComponentsBuilder::default()
//! .node_types::<RethFullAdapter<_, OpNode>>()
//! .noop_pool::<OpPooledTransaction>()
//! .executor(OpExecutorBuilder::default())
//! .noop_consensus()
//! .noop_network::<OpNetworkPrimitives>()
//! .noop_consensus()
//! .executor(OpExecutorBuilder::default())
//! .noop_payload(),
//! Box::new(()) as Box<dyn OnComponentInitializedHook<_>>,
//! )

View File

@@ -155,10 +155,6 @@ impl IsTyped2718 for OpTransactionSigned {
}
impl SignedTransaction for OpTransactionSigned {
fn is_system_tx(&self) -> bool {
self.is_deposit()
}
fn recalculate_hash(&self) -> B256 {
keccak256(self.encoded_2718())
}

View File

@@ -23,7 +23,6 @@ alloy-serde.workspace = true
# reth
reth-chainspec.workspace = true
reth-evm.workspace = true
reth-primitives-traits.workspace = true
reth-chain-state.workspace = true
reth-storage-api.workspace = true

View File

@@ -24,8 +24,8 @@ pub mod estimated_da_size;
use reth_transaction_pool::{CoinbaseTipOrdering, Pool, TransactionValidationTaskExecutor};
/// Type alias for default optimism transaction pool
pub type OpTransactionPool<Client, S, Evm, T = OpPooledTransaction> = Pool<
TransactionValidationTaskExecutor<OpTransactionValidator<Client, T, Evm>>,
pub type OpTransactionPool<Client, S, T = OpPooledTransaction> = Pool<
TransactionValidationTaskExecutor<OpTransactionValidator<Client, T>>,
CoinbaseTipOrdering<T>,
S,
>;

View File

@@ -316,8 +316,7 @@ mod tests {
use alloy_primitives::{TxKind, U256};
use op_alloy_consensus::TxDeposit;
use reth_optimism_chainspec::OP_MAINNET;
use reth_optimism_evm::OpEvmConfig;
use reth_optimism_primitives::{OpPrimitives, OpTransactionSigned};
use reth_optimism_primitives::OpTransactionSigned;
use reth_provider::test_utils::MockEthProvider;
use reth_transaction_pool::{
blobstore::InMemoryBlobStore, validate::EthTransactionValidatorBuilder, TransactionOrigin,
@@ -325,11 +324,8 @@ mod tests {
};
#[tokio::test]
async fn validate_optimism_transaction() {
let client = MockEthProvider::<OpPrimitives>::new()
.with_chain_spec(OP_MAINNET.clone())
.with_genesis_block();
let evm_config = OpEvmConfig::optimism(OP_MAINNET.clone());
let validator = EthTransactionValidatorBuilder::new(client, evm_config)
let client = MockEthProvider::default().with_chain_spec(OP_MAINNET.clone());
let validator = EthTransactionValidatorBuilder::new(client)
.no_shanghai()
.no_cancun()
.build(InMemoryBlobStore::default());

View File

@@ -3,12 +3,10 @@ use alloy_consensus::{BlockHeader, Transaction};
use op_revm::L1BlockInfo;
use parking_lot::RwLock;
use reth_chainspec::ChainSpecProvider;
use reth_evm::ConfigureEvm;
use reth_optimism_evm::RethL1BlockInfo;
use reth_optimism_forks::OpHardforks;
use reth_primitives_traits::{
transaction::error::InvalidTransactionError, Block, BlockBody, BlockTy, GotExpected,
SealedBlock,
transaction::error::InvalidTransactionError, Block, BlockBody, GotExpected, SealedBlock,
};
use reth_storage_api::{AccountInfoReader, BlockReaderIdExt, StateProviderFactory};
use reth_transaction_pool::{
@@ -41,9 +39,9 @@ impl OpL1BlockInfo {
/// Validator for Optimism transactions.
#[derive(Debug, Clone)]
pub struct OpTransactionValidator<Client, Tx, Evm> {
pub struct OpTransactionValidator<Client, Tx> {
/// The type that performs the actual validation.
inner: Arc<EthTransactionValidator<Client, Tx, Evm>>,
inner: Arc<EthTransactionValidator<Client, Tx>>,
/// Additional block info required for validation.
block_info: Arc<OpL1BlockInfo>,
/// If true, ensure that the transaction's sender has enough balance to cover the L1 gas fee
@@ -56,7 +54,7 @@ pub struct OpTransactionValidator<Client, Tx, Evm> {
fork_tracker: Arc<OpForkTracker>,
}
impl<Client, Tx, Evm> OpTransactionValidator<Client, Tx, Evm> {
impl<Client, Tx> OpTransactionValidator<Client, Tx> {
/// Returns the configured chain spec
pub fn chain_spec(&self) -> Arc<Client::ChainSpec>
where
@@ -88,15 +86,14 @@ impl<Client, Tx, Evm> OpTransactionValidator<Client, Tx, Evm> {
}
}
impl<Client, Tx, Evm> OpTransactionValidator<Client, Tx, Evm>
impl<Client, Tx> OpTransactionValidator<Client, Tx>
where
Client:
ChainSpecProvider<ChainSpec: OpHardforks> + StateProviderFactory + BlockReaderIdExt + Sync,
Tx: EthPoolTransaction + OpPooledTx,
Evm: ConfigureEvm,
{
/// Create a new [`OpTransactionValidator`].
pub fn new(inner: EthTransactionValidator<Client, Tx, Evm>) -> Self {
pub fn new(inner: EthTransactionValidator<Client, Tx>) -> Self {
let this = Self::with_block_info(inner, OpL1BlockInfo::default());
if let Ok(Some(block)) =
this.inner.client().block_by_number_or_tag(alloy_eips::BlockNumberOrTag::Latest)
@@ -115,7 +112,7 @@ where
/// Create a new [`OpTransactionValidator`] with the given [`OpL1BlockInfo`].
pub fn with_block_info(
inner: EthTransactionValidator<Client, Tx, Evm>,
inner: EthTransactionValidator<Client, Tx>,
block_info: OpL1BlockInfo,
) -> Self {
Self {
@@ -291,15 +288,13 @@ where
}
}
impl<Client, Tx, Evm> TransactionValidator for OpTransactionValidator<Client, Tx, Evm>
impl<Client, Tx> TransactionValidator for OpTransactionValidator<Client, Tx>
where
Client:
ChainSpecProvider<ChainSpec: OpHardforks> + StateProviderFactory + BlockReaderIdExt + Sync,
Tx: EthPoolTransaction + OpPooledTx,
Evm: ConfigureEvm,
{
type Transaction = Tx;
type Block = BlockTy<Evm::Primitives>;
async fn validate_transaction(
&self,
@@ -330,7 +325,10 @@ where
.await
}
fn on_new_head_block(&self, new_tip_block: &SealedBlock<Self::Block>) {
fn on_new_head_block<B>(&self, new_tip_block: &SealedBlock<B>)
where
B: Block,
{
self.inner.on_new_head_block(new_tip_block);
self.update_l1_block_info(
new_tip_block.header(),

View File

@@ -798,14 +798,12 @@ mod rpc_compat {
.zip(senders)
.enumerate()
.map(|(idx, (tx, sender))| {
#[allow(clippy::needless_update)]
let tx_info = TransactionInfo {
hash: Some(*tx.tx_hash()),
block_hash,
block_number: Some(block_number),
base_fee,
index: Some(idx as u64),
..Default::default()
};
converter(Recovered::new_unchecked(tx, sender), tx_info)

View File

@@ -48,16 +48,6 @@ pub trait SignedTransaction:
+ TxHashRef
+ IsTyped2718
{
/// Returns whether this is a system transaction.
///
/// System transactions are created at the protocol level rather than by users. They are
/// typically used by L2s for special purposes (e.g., Optimism deposit transactions with type
/// 126) and may have different validation rules or fee handling compared to standard
/// user-initiated transactions.
fn is_system_tx(&self) -> bool {
false
}
/// Returns whether this transaction type can be __broadcasted__ as full transaction over the
/// network.
///

View File

@@ -17,7 +17,6 @@ reth-exex-types.workspace = true
reth-db-api.workspace = true
reth-errors.workspace = true
reth-provider.workspace = true
reth-storage-api.workspace = true
reth-tokio-util.workspace = true
reth-config.workspace = true
reth-prune-types.workspace = true
@@ -42,11 +41,15 @@ rayon.workspace = true
tokio.workspace = true
rustc-hash.workspace = true
[features]
rocksdb = ["reth-provider/rocksdb"]
[dev-dependencies]
# reth
reth-db = { workspace = true, features = ["test-utils"] }
reth-stages = { workspace = true, features = ["test-utils"] }
reth-primitives-traits = { workspace = true, features = ["arbitrary"] }
reth-storage-api.workspace = true
reth-testing-utils.workspace = true
reth-tracing.workspace = true

View File

@@ -10,7 +10,6 @@ use reth_provider::{
StageCheckpointReader, StaticFileProviderFactory, StorageSettingsCache,
};
use reth_prune_types::PruneModes;
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
use std::time::Duration;
use tokio::sync::watch;
@@ -83,8 +82,6 @@ impl PrunerBuilder {
+ ChainStateBlockReader
+ StorageSettingsCache
+ StageCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader
+ StaticFileProviderFactory<
Primitives: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>,
>,
@@ -119,9 +116,7 @@ impl PrunerBuilder {
+ PruneCheckpointWriter
+ PruneCheckpointReader
+ StorageSettingsCache
+ StageCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader,
+ StageCheckpointReader,
{
let segments = SegmentSet::<Provider>::from_components(static_file_provider, self.segments);

View File

@@ -20,6 +20,8 @@ pub use user::{
AccountHistory, Bodies, Receipts as UserReceipts, ReceiptsByLogs, SenderRecovery,
StorageHistory, TransactionLookup,
};
#[cfg(all(unix, feature = "rocksdb"))]
pub use user::{AccountsHistoryPruner, StoragesHistoryPruner};
/// Prunes data from static files for a given segment.
///

View File

@@ -10,7 +10,6 @@ use reth_provider::{
PruneCheckpointReader, PruneCheckpointWriter, StaticFileProviderFactory, StorageSettingsCache,
};
use reth_prune_types::PruneModes;
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
/// Collection of [`Segment`]. Thread-safe, allocated on the heap.
#[derive(Debug)]
@@ -53,9 +52,7 @@ where
+ PruneCheckpointReader
+ BlockReader<Transaction: Encodable2718>
+ ChainStateBlockReader
+ StorageSettingsCache
+ ChangeSetReader
+ StorageChangeSetReader,
+ StorageSettingsCache,
{
/// Creates a [`SegmentSet`] from an existing components, such as [`StaticFileProvider`] and
/// [`PruneModes`].

View File

@@ -1,22 +1,14 @@
use crate::{
db_ext::DbTxPruneExt,
segments::{
user::history::{finalize_history_prune, HistoryPruneResult},
PruneInput, Segment,
},
segments::{user::history::prune_history_indices, PruneInput, Segment},
PrunerError,
};
use alloy_primitives::BlockNumber;
use itertools::Itertools;
use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut};
use reth_provider::{
changeset_walker::StaticFileAccountChangesetWalker, DBProvider, EitherWriter,
StaticFileProviderFactory, StorageSettingsCache,
};
use reth_provider::DBProvider;
use reth_prune_types::{
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use reth_static_file_types::StaticFileSegment;
use reth_storage_api::ChangeSetReader;
use rustc_hash::FxHashMap;
use tracing::{instrument, trace};
@@ -39,10 +31,7 @@ impl AccountHistory {
impl<Provider> Segment<Provider> for AccountHistory
where
Provider: DBProvider<Tx: DbTxMut>
+ StaticFileProviderFactory
+ StorageSettingsCache
+ ChangeSetReader,
Provider: DBProvider<Tx: DbTxMut>,
{
fn segment(&self) -> PruneSegment {
PruneSegment::AccountHistory
@@ -67,33 +56,11 @@ where
};
let range_end = *range.end();
// Check where account changesets are stored
if EitherWriter::account_changesets_destination(provider).is_static_file() {
self.prune_static_files(provider, input, range, range_end)
} else {
self.prune_database(provider, input, range, range_end)
}
}
}
impl AccountHistory {
/// Prunes account history when changesets are stored in static files.
fn prune_static_files<Provider>(
&self,
provider: &Provider,
input: PruneInput,
range: std::ops::RangeInclusive<BlockNumber>,
range_end: BlockNumber,
) -> Result<SegmentOutput, PrunerError>
where
Provider: DBProvider<Tx: DbTxMut> + StaticFileProviderFactory + ChangeSetReader,
{
let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE)
} else {
input.limiter
};
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
@@ -101,86 +68,15 @@ impl AccountHistory {
))
}
// The size of this map it's limited by `prune_delete_limit * blocks_since_last_run /
// ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with the current defaults it's usually `3500 * 5 /
// 2`, so 8750 entries. Each entry is `160 bit + 64 bit`, so the total size should be up to
// ~0.25MB + some hashmap overhead. `blocks_since_last_run` is additionally limited by the
// `max_reorg_depth`, so no OOM is expected here.
let mut highest_deleted_accounts = FxHashMap::default();
let mut last_changeset_pruned_block = None;
let mut pruned_changesets = 0;
let mut done = true;
let walker = StaticFileAccountChangesetWalker::new(provider, range);
for result in walker {
if limiter.is_limit_reached() {
done = false;
break;
}
let (block_number, changeset) = result?;
highest_deleted_accounts.insert(changeset.address, block_number);
last_changeset_pruned_block = Some(block_number);
pruned_changesets += 1;
limiter.increment_deleted_entries_count();
}
// Delete static file jars below the pruned block
if let Some(last_block) = last_changeset_pruned_block {
provider
.static_file_provider()
.delete_segment_below_block(StaticFileSegment::AccountChangeSets, last_block + 1)?;
}
trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets from static files)");
let result = HistoryPruneResult {
highest_deleted: highest_deleted_accounts,
last_pruned_block: last_changeset_pruned_block,
pruned_count: pruned_changesets,
done,
};
finalize_history_prune::<_, tables::AccountsHistory, _, _>(
provider,
result,
range_end,
&limiter,
ShardedKey::new,
|a, b| a.key == b.key,
)
.map_err(Into::into)
}
fn prune_database<Provider>(
&self,
provider: &Provider,
input: PruneInput,
range: std::ops::RangeInclusive<BlockNumber>,
range_end: BlockNumber,
) -> Result<SegmentOutput, PrunerError>
where
Provider: DBProvider<Tx: DbTxMut>,
{
let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE)
} else {
input.limiter
};
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
))
}
// Deleted account changeset keys (account addresses) with the highest block number deleted
// for that key.
//
// The size of this map is limited by `prune_delete_limit * blocks_since_last_run /
// ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with the current defaults it's usually `3500 * 5 /
// 2`, so 8750 entries. Each entry is `160 bit + 64 bit`, so the total size should be up to
// ~0.25MB + some hashmap overhead. `blocks_since_last_run` is additionally limited by the
// `max_reorg_depth`, so no OOM is expected here.
let mut last_changeset_pruned_block = None;
// The size of this map it's limited by `prune_delete_limit * blocks_since_last_run /
// ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with current default it's usually `3500 * 5
// / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total
// size should be up to 0.5MB + some hashmap overhead. `blocks_since_last_run` is
// additionally limited by the `max_reorg_depth`, so no OOM is expected here.
let mut highest_deleted_accounts = FxHashMap::default();
let (pruned_changesets, done) =
provider.tx_ref().prune_table_with_range::<tables::AccountChangeSets>(
@@ -192,52 +88,69 @@ impl AccountHistory {
last_changeset_pruned_block = Some(block_number);
},
)?;
trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets from database)");
trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets)");
let result = HistoryPruneResult {
highest_deleted: highest_deleted_accounts,
last_pruned_block: last_changeset_pruned_block,
pruned_count: pruned_changesets,
done,
};
finalize_history_prune::<_, tables::AccountsHistory, _, _>(
let last_changeset_pruned_block = last_changeset_pruned_block
// If there's more account changesets to prune, set the checkpoint block number to
// previous, so we could finish pruning its account changesets on the next run.
.map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
.unwrap_or(range_end);
// Sort highest deleted block numbers by account address and turn them into sharded keys.
// We did not use `BTreeMap` from the beginning, because it's inefficient for hashes.
let highest_sharded_keys = highest_deleted_accounts
.into_iter()
.sorted_unstable() // Unstable is fine because no equal keys exist in the map
.map(|(address, block_number)| {
ShardedKey::new(address, block_number.min(last_changeset_pruned_block))
});
let outcomes = prune_history_indices::<Provider, tables::AccountsHistory, _>(
provider,
result,
range_end,
&limiter,
ShardedKey::new,
highest_sharded_keys,
|a, b| a.key == b.key,
)
.map_err(Into::into)
)?;
trace!(target: "pruner", ?outcomes, %done, "Pruned account history (indices)");
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
pruned: pruned_changesets + outcomes.deleted,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
})
}
}
#[cfg(test)]
mod tests {
use super::ACCOUNT_HISTORY_TABLES_TO_PRUNE;
use crate::segments::{AccountHistory, PruneInput, PruneLimiter, Segment, SegmentOutput};
use crate::segments::{
user::account_history::ACCOUNT_HISTORY_TABLES_TO_PRUNE, AccountHistory, PruneInput,
PruneLimiter, Segment, SegmentOutput,
};
use alloy_primitives::{BlockNumber, B256};
use assert_matches::assert_matches;
use reth_db_api::{models::StorageSettings, tables, BlockNumberList};
use reth_db_api::{tables, BlockNumberList};
use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
use reth_prune_types::{
PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_storage_api::StorageSettingsCache;
use reth_testing_utils::generators::{
self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
};
use std::{collections::BTreeMap, ops::AddAssign};
#[test]
fn prune_legacy() {
fn prune() {
let db = TestStageDB::default();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=5000,
1..=5000,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
@@ -289,9 +202,6 @@ mod tests {
let segment = AccountHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default().with_account_changesets_in_static_files(false),
);
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
@@ -329,18 +239,20 @@ mod tests {
.map(|(i, _)| i)
.unwrap_or_default();
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
let mut pruned_changesets = changesets
.iter()
// Skip what we've pruned so far, subtracting one to get last pruned block
// number further down
.skip(pruned.saturating_sub(1));
let last_pruned_block_number = pruned_changesets
.next()
.map(|(block_number, _)| if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
} as BlockNumber)
.unwrap_or(to_block);
.next()
.map(|(block_number, _)| if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
} as BlockNumber)
.unwrap_or(to_block);
let pruned_changesets = pruned_changesets.fold(
BTreeMap::<_, Vec<_>>::new(),
@@ -391,152 +303,4 @@ mod tests {
test_prune(998, 2, (PruneProgress::Finished, 998));
test_prune(1400, 3, (PruneProgress::Finished, 804));
}
#[test]
fn prune_static_file() {
let db = TestStageDB::default();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=5000,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
let (changesets, _) = random_changeset_range(
&mut rng,
blocks.iter(),
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
0..0,
0..0,
);
db.insert_changesets_to_static_files(changesets.clone(), None)
.expect("insert changesets to static files");
db.insert_history(changesets.clone(), None).expect("insert history");
let account_occurrences = db.table::<tables::AccountsHistory>().unwrap().into_iter().fold(
BTreeMap::<_, usize>::new(),
|mut map, (key, _)| {
map.entry(key.key).or_default().add_assign(1);
map
},
);
assert!(account_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
let original_shards = db.table::<tables::AccountsHistory>().unwrap();
let test_prune =
|to_block: BlockNumber, run: usize, expected_result: (PruneProgress, usize)| {
let prune_mode = PruneMode::Before(to_block);
let deleted_entries_limit = 2000;
let mut limiter =
PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
let input = PruneInput {
previous_checkpoint: db
.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::AccountHistory)
.unwrap(),
to_block,
limiter: limiter.clone(),
};
let segment = AccountHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default().with_account_changesets_in_static_files(true),
);
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
assert_matches!(
result,
SegmentOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);
segment
.save_checkpoint(
&provider,
result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
)
.unwrap();
provider.commit().expect("commit");
let changesets = changesets
.iter()
.enumerate()
.flat_map(|(block_number, changeset)| {
changeset.iter().map(move |change| (block_number, change))
})
.collect::<Vec<_>>();
#[expect(clippy::skip_while_next)]
let pruned = changesets
.iter()
.enumerate()
.skip_while(|(i, (block_number, _))| {
*i < deleted_entries_limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE * run &&
*block_number <= to_block as usize
})
.next()
.map(|(i, _)| i)
.unwrap_or_default();
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
let last_pruned_block_number = pruned_changesets
.next()
.map(|(block_number, _)| {
(if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
}) as BlockNumber
})
.unwrap_or(to_block);
let actual_shards = db.table::<tables::AccountsHistory>().unwrap();
let expected_shards = original_shards
.iter()
.filter(|(key, _)| key.highest_block_number > last_pruned_block_number)
.map(|(key, blocks)| {
let new_blocks =
blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
})
.collect::<Vec<_>>();
assert_eq!(actual_shards, expected_shards);
assert_eq!(
db.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::AccountHistory)
.unwrap(),
Some(PruneCheckpoint {
block_number: Some(last_pruned_block_number),
tx_number: None,
prune_mode
})
);
};
test_prune(
998,
1,
(PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 1000),
);
test_prune(998, 2, (PruneProgress::Finished, 1000));
test_prune(1400, 3, (PruneProgress::Finished, 804));
}
}

View File

@@ -1,6 +1,4 @@
use crate::PruneLimiter;
use alloy_primitives::BlockNumber;
use itertools::Itertools;
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
models::ShardedKey,
@@ -9,8 +7,6 @@ use reth_db_api::{
BlockNumberList, DatabaseError, RawKey, RawTable, RawValue,
};
use reth_provider::DBProvider;
use reth_prune_types::{SegmentOutput, SegmentOutputCheckpoint};
use rustc_hash::FxHashMap;
enum PruneShardOutcome {
Deleted,
@@ -25,65 +21,6 @@ pub(crate) struct PrunedIndices {
pub(crate) unchanged: usize,
}
/// Result of pruning history changesets, used to build the final output.
pub(crate) struct HistoryPruneResult<K> {
/// Map of the highest deleted changeset keys to their block numbers.
pub(crate) highest_deleted: FxHashMap<K, BlockNumber>,
/// The last block number that had changesets pruned.
pub(crate) last_pruned_block: Option<BlockNumber>,
/// Number of changesets pruned.
pub(crate) pruned_count: usize,
/// Whether pruning is complete.
pub(crate) done: bool,
}
/// Finalizes history pruning by sorting sharded keys, pruning history indices, and building output.
///
/// This is shared between static file and database pruning for both account and storage history.
pub(crate) fn finalize_history_prune<Provider, T, K, SK>(
provider: &Provider,
result: HistoryPruneResult<K>,
range_end: BlockNumber,
limiter: &PruneLimiter,
to_sharded_key: impl Fn(K, BlockNumber) -> T::Key,
key_matches: impl Fn(&T::Key, &T::Key) -> bool,
) -> Result<SegmentOutput, DatabaseError>
where
Provider: DBProvider<Tx: DbTxMut>,
T: Table<Value = BlockNumberList>,
T::Key: AsRef<ShardedKey<SK>>,
K: Ord,
{
let HistoryPruneResult { highest_deleted, last_pruned_block, pruned_count, done } = result;
// If there's more changesets to prune, set the checkpoint block number to previous,
// so we could finish pruning its changesets on the next run.
let last_changeset_pruned_block = last_pruned_block
.map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
.unwrap_or(range_end);
// Sort highest deleted block numbers and turn them into sharded keys.
// We use `sorted_unstable` because no equal keys exist in the map.
let highest_sharded_keys =
highest_deleted.into_iter().sorted_unstable().map(|(key, block_number)| {
to_sharded_key(key, block_number.min(last_changeset_pruned_block))
});
let outcomes =
prune_history_indices::<Provider, T, _>(provider, highest_sharded_keys, key_matches)?;
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
pruned: pruned_count + outcomes.deleted,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
})
}
/// Prune history indices according to the provided list of highest sharded keys.
///
/// Returns total number of deleted, updated and unchanged entities.

View File

@@ -3,6 +3,10 @@ mod bodies;
mod history;
mod receipts;
mod receipts_by_logs;
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb_account_history;
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb_storage_history;
mod sender_recovery;
mod storage_history;
mod transaction_lookup;
@@ -11,6 +15,10 @@ pub use account_history::AccountHistory;
pub use bodies::Bodies;
pub use receipts::Receipts;
pub use receipts_by_logs::ReceiptsByLogs;
#[cfg(all(unix, feature = "rocksdb"))]
pub use rocksdb_account_history::AccountsHistoryPruner;
#[cfg(all(unix, feature = "rocksdb"))]
pub use rocksdb_storage_history::StoragesHistoryPruner;
pub use sender_recovery::SenderRecovery;
pub use storage_history::StorageHistory;
pub use transaction_lookup::TransactionLookup;

View File

@@ -0,0 +1,557 @@
//! `RocksDB` account history index pruner.
use crate::{
segments::{PruneInput, Segment},
PrunerError,
};
use reth_provider::{
ChangeSetReader, DBProvider, EitherWriter, PruneShardOutcome, RocksDBProviderFactory,
StaticFileProviderFactory, StorageSettingsCache,
};
use reth_prune_types::{
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use reth_static_file_types::StaticFileSegment;
use rustc_hash::FxHashMap;
use std::ops::RangeInclusive;
use tracing::{instrument, trace};
#[derive(Debug)]
pub struct AccountsHistoryPruner {
mode: PruneMode,
}
impl AccountsHistoryPruner {
pub const fn new(mode: PruneMode) -> Self {
Self { mode }
}
}
impl AccountsHistoryPruner {
fn prune_static_files<Provider>(
&self,
provider: &Provider,
range: RangeInclusive<u64>,
range_end: u64,
input: PruneInput,
) -> Result<SegmentOutput, PrunerError>
where
Provider: ChangeSetReader + RocksDBProviderFactory + StaticFileProviderFactory,
{
let mut limiter = input.limiter;
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
))
}
let mut highest_deleted_accounts = FxHashMap::default();
let mut last_changeset_pruned_block = None;
let mut scanned_changesets = 0usize;
let mut done = true;
for block in range {
let changes = provider.account_block_changeset(block)?;
let changes_count = changes.len();
for change in changes {
highest_deleted_accounts.insert(change.address, block);
}
scanned_changesets += changes_count;
limiter.increment_deleted_entries_count_by(changes_count);
last_changeset_pruned_block = Some(block);
if limiter.is_limit_reached() {
done = false;
break;
}
}
trace!(target: "pruner", scanned = %scanned_changesets, %done, "Scanned account changesets");
let last_changeset_pruned_block = last_changeset_pruned_block.unwrap_or(range_end);
let mut keys_deleted = 0usize;
let mut keys_updated = 0usize;
provider.with_rocksdb_batch(|mut batch| {
for (address, highest_block) in &highest_deleted_accounts {
let prune_to = (*highest_block).min(last_changeset_pruned_block);
match batch.prune_account_history_to(*address, prune_to)? {
PruneShardOutcome::Deleted => keys_deleted += 1,
PruneShardOutcome::Updated => keys_updated += 1,
PruneShardOutcome::Unchanged => {}
}
}
Ok(((), Some(batch.into_inner())))
})?;
trace!(target: "pruner", keys_deleted, keys_updated, %done, "Pruned account history (RocksDB indices)");
if done {
provider.static_file_provider().delete_segment_below_block(
StaticFileSegment::AccountChangeSets,
last_changeset_pruned_block + 1,
)?;
}
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
pruned: scanned_changesets + keys_deleted,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
})
}
fn prune_database<Provider>(
&self,
provider: &Provider,
range: RangeInclusive<u64>,
range_end: u64,
input: PruneInput,
) -> Result<SegmentOutput, PrunerError>
where
Provider: ChangeSetReader + RocksDBProviderFactory,
{
let mut limiter = input.limiter;
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
))
}
let mut highest_deleted_accounts = FxHashMap::default();
let mut last_changeset_pruned_block = None;
let mut scanned_changesets = 0usize;
let mut done = true;
for block in range {
let changes = provider.account_block_changeset(block)?;
let changes_count = changes.len();
for change in changes {
highest_deleted_accounts.insert(change.address, block);
}
scanned_changesets += changes_count;
limiter.increment_deleted_entries_count_by(changes_count);
last_changeset_pruned_block = Some(block);
if limiter.is_limit_reached() {
done = false;
break;
}
}
trace!(target: "pruner", scanned = %scanned_changesets, %done, "Scanned account changesets");
let last_changeset_pruned_block = last_changeset_pruned_block.unwrap_or(range_end);
let mut keys_deleted = 0usize;
let mut keys_updated = 0usize;
provider.with_rocksdb_batch(|mut batch| {
for (address, highest_block) in &highest_deleted_accounts {
let prune_to = (*highest_block).min(last_changeset_pruned_block);
match batch.prune_account_history_to(*address, prune_to)? {
PruneShardOutcome::Deleted => keys_deleted += 1,
PruneShardOutcome::Updated => keys_updated += 1,
PruneShardOutcome::Unchanged => {}
}
}
Ok(((), Some(batch.into_inner())))
})?;
trace!(target: "pruner", keys_deleted, keys_updated, %done, "Pruned account history (RocksDB indices)");
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
pruned: scanned_changesets + keys_deleted,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
})
}
}
impl<Provider> Segment<Provider> for AccountsHistoryPruner
where
Provider: ChangeSetReader
+ RocksDBProviderFactory
+ StaticFileProviderFactory
+ StorageSettingsCache
+ DBProvider,
{
fn segment(&self) -> PruneSegment {
PruneSegment::AccountHistory
}
fn mode(&self) -> Option<PruneMode> {
Some(self.mode)
}
fn purpose(&self) -> PrunePurpose {
PrunePurpose::User
}
#[instrument(target = "pruner", skip(self, provider), ret(level = "trace"))]
fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
let range = match input.get_next_block_range() {
Some(range) => range,
None => {
trace!(target: "pruner", "No account history to prune");
return Ok(SegmentOutput::done())
}
};
let range_end = *range.end();
if EitherWriter::account_changesets_destination(provider).is_static_file() {
self.prune_static_files(provider, range, range_end, input)
} else {
self.prune_database(provider, range, range_end, input)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::segments::{PruneInput, PruneLimiter, Segment};
use alloy_primitives::{Address, B256};
use reth_db_api::{
models::{AccountBeforeTx, ShardedKey},
tables,
transaction::DbTxMut,
BlockNumberList,
};
use reth_provider::{
DatabaseProviderFactory, RocksDBProviderFactory, StaticFileProviderFactory,
StaticFileWriter,
};
use reth_prune_types::{PruneMode, PruneProgress};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_static_file_types::StaticFileSegment;
use reth_storage_api::{DBProvider, StorageSettings, StorageSettingsCache};
use reth_testing_utils::generators::{
self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
};
use std::collections::BTreeMap;
fn setup_rocksdb_test_db() -> TestStageDB {
let db = TestStageDB::default();
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
db
}
fn generate_test_changeset(_block: u64, addresses: &[Address]) -> Vec<AccountBeforeTx> {
addresses.iter().map(|&address| AccountBeforeTx { address, info: None }).collect()
}
/// Helper to write account history to RocksDB.
fn write_account_history_to_rocksdb(db: &TestStageDB, history: &[(Address, Vec<u64>)]) {
let provider = db.factory.database_provider_rw().unwrap();
provider
.with_rocksdb_batch(|mut batch| {
for (address, blocks) in history {
let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
batch
.put::<tables::AccountsHistory>(ShardedKey::last(*address), &shard)
.unwrap();
}
Ok(((), Some(batch.into_inner())))
})
.unwrap();
provider.commit().unwrap();
}
#[test]
fn test_prune_account_history_static_files_full_range() {
let db = setup_rocksdb_test_db();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=10,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let addresses: Vec<Address> = (0..3)
.map(|i| {
let mut addr = Address::ZERO;
addr.0[0] = i as u8;
addr
})
.collect();
let static_file_provider = db.factory.static_file_provider();
{
let mut writer = static_file_provider
.latest_writer(StaticFileSegment::AccountChangeSets)
.expect("get writer");
for block_num in 0..=10 {
let changeset = generate_test_changeset(block_num, &addresses);
writer.append_account_changeset(changeset, block_num).expect("append changeset");
}
writer.commit().expect("commit static file");
}
let highest = static_file_provider
.get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
assert_eq!(highest, Some(10), "Static file should cover blocks 0-10");
// Write account history to RocksDB (not MDBX)
let history: Vec<_> = addresses.iter().map(|&addr| (addr, (0..=10).collect())).collect();
write_account_history_to_rocksdb(&db, &history);
let prune_mode = PruneMode::Before(5);
let input =
PruneInput { previous_checkpoint: None, to_block: 5, limiter: PruneLimiter::default() };
let segment = AccountsHistoryPruner::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
let result = segment.prune(&provider, input).expect("prune should succeed");
provider.commit().expect("commit after prune");
assert_eq!(result.progress, PruneProgress::Finished);
assert!(result.pruned > 0, "Should have pruned some entries");
let checkpoint = result.checkpoint.expect("should have checkpoint");
assert_eq!(checkpoint.block_number, Some(5));
}
#[test]
fn test_prune_account_history_mdbx_fallback() {
let db = setup_rocksdb_test_db();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
1..=10,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
let (changesets, _) = random_changeset_range(
&mut rng,
blocks.iter(),
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
0..0,
0..0,
);
db.insert_changesets(changesets.clone(), None).expect("insert changesets");
db.insert_history(changesets, None).expect("insert history");
let mdbx_count_before = db.table::<tables::AccountChangeSets>().unwrap().len();
assert!(mdbx_count_before > 0, "Should have MDBX changesets");
let static_file_provider = db.factory.static_file_provider();
let highest = static_file_provider
.get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
assert!(highest.is_none(), "No static file coverage expected");
let prune_mode = PruneMode::Before(5);
let input =
PruneInput { previous_checkpoint: None, to_block: 5, limiter: PruneLimiter::default() };
let segment = AccountsHistoryPruner::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
let result = segment.prune(&provider, input).expect("prune should succeed");
provider.commit().expect("commit after prune");
assert_eq!(result.progress, PruneProgress::Finished);
assert!(result.pruned > 0, "Should have pruned some entries");
}
#[test]
fn test_prune_account_history_limiter_block_boundary() {
let db = setup_rocksdb_test_db();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=20,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let addresses: Vec<Address> = (0..10)
.map(|i| {
let mut addr = Address::ZERO;
addr.0[0] = i as u8;
addr
})
.collect();
let static_file_provider = db.factory.static_file_provider();
{
let mut writer = static_file_provider
.latest_writer(StaticFileSegment::AccountChangeSets)
.expect("get writer");
for block_num in 0..=20 {
let changeset = generate_test_changeset(block_num, &addresses);
writer.append_account_changeset(changeset, block_num).expect("append changeset");
}
writer.commit().expect("commit static file");
}
let provider = db.factory.database_provider_rw().unwrap();
for block_num in 0..=20u64 {
for addr in &addresses {
provider
.tx_ref()
.put::<tables::AccountsHistory>(
reth_db_api::models::ShardedKey::new(*addr, block_num),
reth_db_api::BlockNumberList::new([block_num]).unwrap(),
)
.expect("insert history");
}
}
provider.commit().expect("commit provider");
let prune_mode = PruneMode::Before(15);
let limiter = PruneLimiter::default().set_deleted_entries_limit(25);
let input = PruneInput { previous_checkpoint: None, to_block: 15, limiter };
let segment = AccountsHistoryPruner::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
let result = segment.prune(&provider, input).expect("prune should succeed");
provider.commit().expect("commit after prune");
assert_eq!(
result.progress,
PruneProgress::HasMoreData(
reth_prune_types::PruneInterruptReason::DeletedEntriesLimitReached
),
"Should stop due to limit"
);
let checkpoint = result.checkpoint.expect("should have checkpoint");
let pruned_block = checkpoint.block_number.expect("should have block number");
assert!(pruned_block < 15, "Should have stopped before target block");
assert!(pruned_block >= 0, "Should have pruned at least one block");
}
#[test]
fn test_prune_account_history_empty_range() {
let db = setup_rocksdb_test_db();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
1..=5,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let prune_mode = PruneMode::Before(5);
let input = PruneInput {
previous_checkpoint: Some(reth_prune_types::PruneCheckpoint {
block_number: Some(5),
tx_number: None,
prune_mode,
}),
to_block: 5,
limiter: PruneLimiter::default(),
};
let segment = AccountsHistoryPruner::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
let result = segment.prune(&provider, input).expect("prune should succeed");
assert_eq!(result.progress, PruneProgress::Finished);
assert_eq!(result.pruned, 0, "Nothing to prune");
}
#[test]
fn test_prune_account_history_mixed_static_and_mdbx() {
let db = setup_rocksdb_test_db();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=20,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let addresses: Vec<Address> = (0..3)
.map(|i| {
let mut addr = Address::ZERO;
addr.0[0] = i as u8;
addr
})
.collect();
let static_file_provider = db.factory.static_file_provider();
{
let mut writer = static_file_provider
.latest_writer(StaticFileSegment::AccountChangeSets)
.expect("get writer");
for block_num in 0..=10 {
let changeset = generate_test_changeset(block_num, &addresses);
writer.append_account_changeset(changeset, block_num).expect("append changeset");
}
writer.commit().expect("commit static file");
}
db.commit(|tx| {
for block_num in 11..=20u64 {
for addr in &addresses {
tx.put::<tables::AccountChangeSets>(
block_num,
AccountBeforeTx { address: *addr, info: None },
)?;
}
}
Ok(())
})
.expect("insert MDBX changesets");
let provider = db.factory.database_provider_rw().unwrap();
for block_num in 0..=20u64 {
for addr in &addresses {
provider
.tx_ref()
.put::<tables::AccountsHistory>(
reth_db_api::models::ShardedKey::new(*addr, block_num),
reth_db_api::BlockNumberList::new([block_num]).unwrap(),
)
.expect("insert history");
}
}
provider.commit().expect("commit provider");
let prune_mode = PruneMode::Before(15);
let input = PruneInput {
previous_checkpoint: None,
to_block: 15,
limiter: PruneLimiter::default(),
};
let segment = AccountsHistoryPruner::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
let result = segment.prune(&provider, input).expect("prune should succeed");
provider.commit().expect("commit after prune");
assert_eq!(result.progress, PruneProgress::Finished);
assert!(result.pruned > 0, "Should have pruned entries from both static and MDBX");
let checkpoint = result.checkpoint.expect("should have checkpoint");
assert_eq!(checkpoint.block_number, Some(15));
}
}

View File

@@ -0,0 +1,561 @@
//! `RocksDB` storage history index pruner.
use crate::{
segments::{PruneInput, Segment},
PrunerError,
};
use alloy_primitives::BlockNumber;
use reth_provider::{
DBProvider, EitherWriter, PruneShardOutcome, RocksDBProviderFactory, StaticFileProviderFactory,
StorageChangeSetReader, StorageSettingsCache,
};
use reth_prune_types::{
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use reth_static_file_types::StaticFileSegment;
use rustc_hash::FxHashMap;
use std::ops::RangeInclusive;
use tracing::{instrument, trace};
#[derive(Debug)]
pub struct StoragesHistoryPruner {
mode: PruneMode,
}
impl StoragesHistoryPruner {
pub const fn new(mode: PruneMode) -> Self {
Self { mode }
}
/// Prune when storage changesets are in static files.
fn prune_static_files<Provider>(
&self,
provider: &Provider,
range: RangeInclusive<BlockNumber>,
range_end: BlockNumber,
input: PruneInput,
) -> Result<SegmentOutput, PrunerError>
where
Provider: StorageChangeSetReader + RocksDBProviderFactory + StaticFileProviderFactory,
{
let mut limiter = input.limiter;
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
))
}
let mut highest_deleted_storages = FxHashMap::default();
let mut last_changeset_pruned_block = None;
let mut scanned_changesets = 0usize;
let mut done = true;
for block in range {
let changes = provider.storage_block_changeset(block)?;
let changes_count = changes.len();
for change in changes {
highest_deleted_storages.insert((change.address, change.key), block);
}
scanned_changesets += changes_count;
limiter.increment_deleted_entries_count_by(changes_count);
last_changeset_pruned_block = Some(block);
if limiter.is_limit_reached() {
done = false;
break;
}
}
trace!(target: "pruner", scanned = %scanned_changesets, %done, "Scanned storage changesets");
let last_changeset_pruned_block = last_changeset_pruned_block.unwrap_or(range_end);
let mut keys_deleted = 0usize;
let mut keys_updated = 0usize;
provider.with_rocksdb_batch(|mut batch| {
for ((address, storage_key), highest_block) in &highest_deleted_storages {
let prune_to = (*highest_block).min(last_changeset_pruned_block);
match batch.prune_storage_history_to(*address, *storage_key, prune_to)? {
PruneShardOutcome::Deleted => keys_deleted += 1,
PruneShardOutcome::Updated => keys_updated += 1,
PruneShardOutcome::Unchanged => {}
}
}
Ok(((), Some(batch.into_inner())))
})?;
trace!(target: "pruner", keys_deleted, keys_updated, %done, "Pruned storage history (RocksDB indices)");
// Delete static file jars below the pruned block
provider.static_file_provider().delete_segment_below_block(
StaticFileSegment::StorageChangeSets,
last_changeset_pruned_block + 1,
)?;
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
pruned: scanned_changesets + keys_deleted,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
})
}
/// Prune when storage changesets are in the database.
fn prune_database<Provider>(
&self,
provider: &Provider,
range: RangeInclusive<BlockNumber>,
range_end: BlockNumber,
input: PruneInput,
) -> Result<SegmentOutput, PrunerError>
where
Provider: StorageChangeSetReader + RocksDBProviderFactory,
{
let mut limiter = input.limiter;
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
))
}
let mut highest_deleted_storages = FxHashMap::default();
let mut last_changeset_pruned_block = None;
let mut scanned_changesets = 0usize;
let mut done = true;
for block in range {
let changes = provider.storage_block_changeset(block)?;
let changes_count = changes.len();
for change in changes {
highest_deleted_storages.insert((change.address, change.key), block);
}
scanned_changesets += changes_count;
limiter.increment_deleted_entries_count_by(changes_count);
last_changeset_pruned_block = Some(block);
if limiter.is_limit_reached() {
done = false;
break;
}
}
trace!(target: "pruner", scanned = %scanned_changesets, %done, "Scanned storage changesets");
let last_changeset_pruned_block = last_changeset_pruned_block.unwrap_or(range_end);
let mut keys_deleted = 0usize;
let mut keys_updated = 0usize;
provider.with_rocksdb_batch(|mut batch| {
for ((address, storage_key), highest_block) in &highest_deleted_storages {
let prune_to = (*highest_block).min(last_changeset_pruned_block);
match batch.prune_storage_history_to(*address, *storage_key, prune_to)? {
PruneShardOutcome::Deleted => keys_deleted += 1,
PruneShardOutcome::Updated => keys_updated += 1,
PruneShardOutcome::Unchanged => {}
}
}
Ok(((), Some(batch.into_inner())))
})?;
trace!(target: "pruner", keys_deleted, keys_updated, %done, "Pruned storage history (RocksDB indices)");
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
pruned: scanned_changesets + keys_deleted,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
})
}
}
impl<Provider> Segment<Provider> for StoragesHistoryPruner
where
Provider: DBProvider
+ StorageChangeSetReader
+ RocksDBProviderFactory
+ StaticFileProviderFactory
+ StorageSettingsCache,
{
fn segment(&self) -> PruneSegment {
PruneSegment::StorageHistory
}
fn mode(&self) -> Option<PruneMode> {
Some(self.mode)
}
fn purpose(&self) -> PrunePurpose {
PrunePurpose::User
}
#[instrument(target = "pruner", skip(self, provider), ret(level = "trace"))]
fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
let range = match input.get_next_block_range() {
Some(range) => range,
None => {
trace!(target: "pruner", "No storage history to prune");
return Ok(SegmentOutput::done())
}
};
let range_end = *range.end();
if EitherWriter::storage_changesets_destination(provider).is_static_file() {
self.prune_static_files(provider, range, range_end, input)
} else {
self.prune_database(provider, range, range_end, input)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::segments::{PruneInput, PruneLimiter, Segment};
use alloy_primitives::{Address, B256, U256};
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress, StorageBeforeTx},
tables,
transaction::DbTxMut,
BlockNumberList,
};
use reth_primitives_traits::StorageEntry;
use reth_provider::{
DatabaseProviderFactory, RocksDBProviderFactory, StaticFileProviderFactory,
StaticFileWriter,
};
use reth_prune_types::{PruneMode, PruneProgress};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_static_file_types::StaticFileSegment;
use reth_storage_api::{DBProvider, StorageSettings, StorageSettingsCache};
use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
fn setup_rocksdb_test_db() -> TestStageDB {
let db = TestStageDB::default();
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
db
}
/// Helper to write storage changesets to static files for testing.
fn write_storage_changesets_to_static_files(
db: &TestStageDB,
changesets: &[(u64, Address, B256, U256)],
) {
let static_file_provider = db.factory.static_file_provider();
let mut writer =
static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
let mut current_block: Option<u64> = None;
let mut block_entries = Vec::new();
for (block_number, address, key, value) in changesets {
if current_block != Some(*block_number) {
if !block_entries.is_empty() {
writer
.append_storage_changeset(
std::mem::take(&mut block_entries),
current_block.unwrap(),
)
.unwrap();
}
current_block = Some(*block_number);
}
block_entries.push(StorageBeforeTx { address: *address, key: *key, value: *value });
}
if !block_entries.is_empty() {
writer.append_storage_changeset(block_entries, current_block.unwrap()).unwrap();
}
writer.commit().unwrap();
static_file_provider.initialize_index().unwrap();
}
/// Helper to write storage history to `RocksDB`.
fn write_storage_history_to_rocksdb(db: &TestStageDB, history: &[(Address, B256, Vec<u64>)]) {
let provider = db.factory.database_provider_rw().unwrap();
provider
.with_rocksdb_batch(|mut batch| {
for (address, key, blocks) in history {
let shard = BlockNumberList::new_pre_sorted(blocks.iter().copied());
batch
.put::<tables::StoragesHistory>(
StorageShardedKey::last(*address, *key),
&shard,
)
.unwrap();
}
Ok(((), Some(batch.into_inner())))
})
.unwrap();
provider.commit().unwrap();
}
/// Helper to read storage history from `RocksDB`.
fn read_storage_history_from_rocksdb(
db: &TestStageDB,
address: Address,
key: B256,
) -> Vec<u64> {
let provider = db.factory.database_provider_rw().unwrap();
let rocksdb = provider.rocksdb_provider();
let shards = rocksdb.storage_history_shards(address, key).unwrap();
shards.into_iter().flat_map(|(_, blocks)| blocks.iter().collect::<Vec<_>>()).collect()
}
#[test]
fn test_prune_storage_history_static_files_full_range() {
let db = setup_rocksdb_test_db();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=10,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let addr1 = Address::from([0x11; 20]);
let addr2 = Address::from([0x22; 20]);
let key1 = B256::from([0x01; 32]);
let key2 = B256::from([0x02; 32]);
let changesets = vec![
(0, addr1, key1, U256::from(50)),
(1, addr1, key1, U256::from(100)),
(1, addr1, key2, U256::from(200)),
(2, addr1, key1, U256::from(110)),
(2, addr2, key1, U256::from(300)),
(3, addr1, key1, U256::from(120)),
(3, addr2, key2, U256::from(400)),
];
write_storage_changesets_to_static_files(&db, &changesets);
write_storage_history_to_rocksdb(
&db,
&[
(addr1, key1, vec![0, 1, 2, 3, 10, 20]),
(addr1, key2, vec![1, 15]),
(addr2, key1, vec![2, 25]),
(addr2, key2, vec![3, 30]),
],
);
let segment = StoragesHistoryPruner::new(PruneMode::Before(3));
let provider = db.factory.database_provider_rw().unwrap();
let input =
PruneInput { previous_checkpoint: None, to_block: 3, limiter: PruneLimiter::default() };
let result = segment.prune(&provider, input).unwrap();
provider.commit().unwrap();
assert_eq!(result.progress, PruneProgress::Finished);
assert!(result.pruned > 0);
let history1 = read_storage_history_from_rocksdb(&db, addr1, key1);
assert!(!history1.contains(&0));
assert!(!history1.contains(&1));
assert!(!history1.contains(&2));
assert!(!history1.contains(&3));
assert!(history1.contains(&10) || history1.contains(&20));
let history2 = read_storage_history_from_rocksdb(&db, addr1, key2);
assert!(!history2.contains(&1));
let history3 = read_storage_history_from_rocksdb(&db, addr2, key1);
assert!(!history3.contains(&2));
let history4 = read_storage_history_from_rocksdb(&db, addr2, key2);
assert!(!history4.contains(&3));
}
#[test]
fn test_prune_storage_history_mdbx_fallback() {
let db = setup_rocksdb_test_db();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
1..=10,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let addr1 = Address::from([0x11; 20]);
let key1 = B256::from([0x01; 32]);
db.commit(|tx| {
for block in 1..=5u64 {
tx.put::<tables::StorageChangeSets>(
BlockNumberAddress((block, addr1)),
StorageEntry { key: key1, value: U256::from(block * 100) },
)?;
}
Ok(())
})
.unwrap();
write_storage_history_to_rocksdb(&db, &[(addr1, key1, vec![1, 2, 3, 4, 5])]);
let segment = StoragesHistoryPruner::new(PruneMode::Before(3));
let provider = db.factory.database_provider_rw().unwrap();
let input =
PruneInput { previous_checkpoint: None, to_block: 3, limiter: PruneLimiter::default() };
let result = segment.prune(&provider, input).unwrap();
provider.commit().unwrap();
assert_eq!(result.progress, PruneProgress::Finished);
assert!(result.pruned > 0);
}
#[test]
fn test_prune_storage_history_limiter_block_boundary() {
let db = setup_rocksdb_test_db();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=20,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let addr1 = Address::from([0x11; 20]);
let key1 = B256::from([0x01; 32]);
let mut changesets = Vec::new();
for block in 0..=10u64 {
for i in 0..10 {
changesets.push((block, addr1, B256::from([i as u8; 32]), U256::from(block * 100)));
}
}
write_storage_changesets_to_static_files(&db, &changesets);
let all_blocks: Vec<u64> = (0..=10).collect();
write_storage_history_to_rocksdb(&db, &[(addr1, key1, all_blocks)]);
let segment = StoragesHistoryPruner::new(PruneMode::Before(10));
let provider = db.factory.database_provider_rw().unwrap();
let limiter = PruneLimiter::default().set_deleted_entries_limit(15);
let input = PruneInput { previous_checkpoint: None, to_block: 10, limiter };
let result = segment.prune(&provider, input).unwrap();
provider.commit().unwrap();
assert_eq!(
result.progress,
PruneProgress::HasMoreData(
reth_prune_types::PruneInterruptReason::DeletedEntriesLimitReached
),
"Should stop due to limit"
);
let checkpoint = result.checkpoint.expect("should have checkpoint");
let pruned_block = checkpoint.block_number.expect("should have block number");
assert!(pruned_block < 10, "Should have stopped before target block");
assert!(pruned_block > 0, "Should have pruned at least some blocks");
}
#[test]
fn test_prune_storage_history_empty_range() {
let db = setup_rocksdb_test_db();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
1..=5,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let segment = StoragesHistoryPruner::new(PruneMode::Before(5));
let provider = db.factory.database_provider_rw().unwrap();
let input = PruneInput {
previous_checkpoint: Some(reth_prune_types::PruneCheckpoint {
block_number: Some(5),
tx_number: None,
prune_mode: PruneMode::Before(5),
}),
to_block: 5,
limiter: PruneLimiter::default(),
};
let result = segment.prune(&provider, input).unwrap();
assert_eq!(result.progress, PruneProgress::Finished);
assert_eq!(result.pruned, 0, "Nothing to prune");
}
#[test]
fn test_prune_storage_history_mixed_static_and_mdbx() {
let db = setup_rocksdb_test_db();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=20,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let addr1 = Address::from([0x11; 20]);
let key1 = B256::from([0x01; 32]);
let mut changesets = Vec::new();
for block in 0..=10u64 {
changesets.push((block, addr1, key1, U256::from(block * 100)));
}
write_storage_changesets_to_static_files(&db, &changesets);
db.commit(|tx| {
for block in 11..=20u64 {
tx.put::<tables::StorageChangeSets>(
BlockNumberAddress((block, addr1)),
StorageEntry { key: key1, value: U256::from(block * 100) },
)?;
}
Ok(())
})
.unwrap();
write_storage_history_to_rocksdb(&db, &[(addr1, key1, (0..=20).collect())]);
let segment = StoragesHistoryPruner::new(PruneMode::Before(15));
let provider = db.factory.database_provider_rw().unwrap();
let input = PruneInput {
previous_checkpoint: None,
to_block: 15,
limiter: PruneLimiter::default(),
};
let result = segment.prune(&provider, input).unwrap();
provider.commit().unwrap();
assert_eq!(result.progress, PruneProgress::Finished);
assert!(result.pruned > 0, "Should have pruned entries from both static and MDBX");
let checkpoint = result.checkpoint.expect("should have checkpoint");
assert_eq!(checkpoint.block_number, Some(15));
}
}

View File

@@ -1,27 +1,20 @@
use crate::{
db_ext::DbTxPruneExt,
segments::{
user::history::{finalize_history_prune, HistoryPruneResult},
PruneInput, Segment,
},
segments::{user::history::prune_history_indices, PruneInput, Segment, SegmentOutput},
PrunerError,
};
use alloy_primitives::{Address, BlockNumber, B256};
use itertools::Itertools;
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress},
tables,
transaction::DbTxMut,
};
use reth_provider::{DBProvider, EitherWriter, StaticFileProviderFactory};
use reth_prune_types::{
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use reth_static_file_types::StaticFileSegment;
use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
use reth_provider::DBProvider;
use reth_prune_types::{PruneMode, PrunePurpose, PruneSegment, SegmentOutputCheckpoint};
use rustc_hash::FxHashMap;
use tracing::{instrument, trace};
/// Number of storage history tables to prune in one step.
/// Number of storage history tables to prune in one step
///
/// Storage History consists of two tables: [`tables::StorageChangeSets`] and
/// [`tables::StoragesHistory`]. We want to prune them to the same block number.
@@ -40,10 +33,7 @@ impl StorageHistory {
impl<Provider> Segment<Provider> for StorageHistory
where
Provider: DBProvider<Tx: DbTxMut>
+ StaticFileProviderFactory
+ StorageChangeSetReader
+ StorageSettingsCache,
Provider: DBProvider<Tx: DbTxMut>,
{
fn segment(&self) -> PruneSegment {
PruneSegment::StorageHistory
@@ -68,32 +58,11 @@ where
};
let range_end = *range.end();
if EitherWriter::storage_changesets_destination(provider).is_static_file() {
self.prune_static_files(provider, input, range, range_end)
} else {
self.prune_database(provider, input, range, range_end)
}
}
}
impl StorageHistory {
/// Prunes storage history when changesets are stored in static files.
fn prune_static_files<Provider>(
&self,
provider: &Provider,
input: PruneInput,
range: std::ops::RangeInclusive<BlockNumber>,
range_end: BlockNumber,
) -> Result<SegmentOutput, PrunerError>
where
Provider: DBProvider<Tx: DbTxMut> + StaticFileProviderFactory,
{
let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE)
} else {
input.limiter
};
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
@@ -101,90 +70,15 @@ impl StorageHistory {
))
}
// The size of this map is limited by `prune_delete_limit * blocks_since_last_run /
// STORAGE_HISTORY_TABLES_TO_PRUNE`, and with current defaults it's usually `3500 * 5
// / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total
// size should be up to ~0.5MB + some hashmap overhead. `blocks_since_last_run` is
// additionally limited by the `max_reorg_depth`, so no OOM is expected here.
let mut highest_deleted_storages = FxHashMap::default();
let mut last_changeset_pruned_block = None;
let mut pruned_changesets = 0;
let mut done = true;
let walker = provider.static_file_provider().walk_storage_changeset_range(range);
for result in walker {
if limiter.is_limit_reached() {
done = false;
break;
}
let (block_address, entry) = result?;
let block_number = block_address.block_number();
let address = block_address.address();
highest_deleted_storages.insert((address, entry.key), block_number);
last_changeset_pruned_block = Some(block_number);
pruned_changesets += 1;
limiter.increment_deleted_entries_count();
}
// Delete static file jars below the pruned block
if let Some(last_block) = last_changeset_pruned_block {
provider
.static_file_provider()
.delete_segment_below_block(StaticFileSegment::StorageChangeSets, last_block + 1)?;
}
trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned storage history (changesets from static files)");
let result = HistoryPruneResult {
highest_deleted: highest_deleted_storages,
last_pruned_block: last_changeset_pruned_block,
pruned_count: pruned_changesets,
done,
};
finalize_history_prune::<_, tables::StoragesHistory, (Address, B256), _>(
provider,
result,
range_end,
&limiter,
|(address, storage_key), block_number| {
StorageShardedKey::new(address, storage_key, block_number)
},
|a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
)
.map_err(Into::into)
}
fn prune_database<Provider>(
&self,
provider: &Provider,
input: PruneInput,
range: std::ops::RangeInclusive<BlockNumber>,
range_end: BlockNumber,
) -> Result<SegmentOutput, PrunerError>
where
Provider: DBProvider<Tx: DbTxMut>,
{
let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE)
} else {
input.limiter
};
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
))
}
// Deleted storage changeset keys (account addresses and storage slots) with the highest
// block number deleted for that key.
//
// The size of this map is limited by `prune_delete_limit * blocks_since_last_run /
// STORAGE_HISTORY_TABLES_TO_PRUNE`, and with current defaults it's usually `3500 * 5
// The size of this map it's limited by `prune_delete_limit * blocks_since_last_run /
// STORAGE_HISTORY_TABLES_TO_PRUNE`, and with current default it's usually `3500 * 5
// / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total
// size should be up to ~0.5MB + some hashmap overhead. `blocks_since_last_run` is
// size should be up to 0.5MB + some hashmap overhead. `blocks_since_last_run` is
// additionally limited by the `max_reorg_depth`, so no OOM is expected here.
let mut last_changeset_pruned_block = None;
let mut highest_deleted_storages = FxHashMap::default();
let (pruned_changesets, done) =
provider.tx_ref().prune_table_with_range::<tables::StorageChangeSets>(
@@ -198,46 +92,64 @@ impl StorageHistory {
)?;
trace!(target: "pruner", deleted = %pruned_changesets, %done, "Pruned storage history (changesets)");
let result = HistoryPruneResult {
highest_deleted: highest_deleted_storages,
last_pruned_block: last_changeset_pruned_block,
pruned_count: pruned_changesets,
done,
};
finalize_history_prune::<_, tables::StoragesHistory, (Address, B256), _>(
let last_changeset_pruned_block = last_changeset_pruned_block
// If there's more storage changesets to prune, set the checkpoint block number to
// previous, so we could finish pruning its storage changesets on the next run.
.map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
.unwrap_or(range_end);
// Sort highest deleted block numbers by account address and storage key and turn them into
// sharded keys.
// We did not use `BTreeMap` from the beginning, because it's inefficient for hashes.
let highest_sharded_keys = highest_deleted_storages
.into_iter()
.sorted_unstable() // Unstable is fine because no equal keys exist in the map
.map(|((address, storage_key), block_number)| {
StorageShardedKey::new(
address,
storage_key,
block_number.min(last_changeset_pruned_block),
)
});
let outcomes = prune_history_indices::<Provider, tables::StoragesHistory, _>(
provider,
result,
range_end,
&limiter,
|(address, storage_key), block_number| {
StorageShardedKey::new(address, storage_key, block_number)
},
highest_sharded_keys,
|a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
)
.map_err(Into::into)
)?;
trace!(target: "pruner", ?outcomes, %done, "Pruned storage history (indices)");
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
pruned: pruned_changesets + outcomes.deleted,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
})
}
}
#[cfg(test)]
mod tests {
use super::STORAGE_HISTORY_TABLES_TO_PRUNE;
use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, StorageHistory};
use crate::segments::{
user::storage_history::STORAGE_HISTORY_TABLES_TO_PRUNE, PruneInput, PruneLimiter, Segment,
SegmentOutput, StorageHistory,
};
use alloy_primitives::{BlockNumber, B256};
use assert_matches::assert_matches;
use reth_db_api::{models::StorageSettings, tables, BlockNumberList};
use reth_db_api::{tables, BlockNumberList};
use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
use reth_prune_types::{
PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
};
use reth_prune_types::{PruneCheckpoint, PruneMode, PruneProgress, PruneSegment};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_storage_api::StorageSettingsCache;
use reth_testing_utils::generators::{
self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
};
use std::{collections::BTreeMap, ops::AddAssign};
#[test]
fn prune_legacy() {
fn prune() {
let db = TestStageDB::default();
let mut rng = generators::rng();
@@ -296,9 +208,6 @@ mod tests {
let segment = StorageHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default().with_storage_changesets_in_static_files(false),
);
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
@@ -338,19 +247,19 @@ mod tests {
.map(|(i, _)| i)
.unwrap_or_default();
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
let mut pruned_changesets = changesets
.iter()
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
.skip(pruned.saturating_sub(1));
let last_pruned_block_number = pruned_changesets
.next()
.map(|(block_number, _, _)| {
(if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
}) as BlockNumber
})
.map(|(block_number, _, _)| if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
} as BlockNumber)
.unwrap_or(to_block);
let pruned_changesets = pruned_changesets.fold(
@@ -397,160 +306,14 @@ mod tests {
test_prune(
998,
1,
(PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 500),
(
PruneProgress::HasMoreData(
reth_prune_types::PruneInterruptReason::DeletedEntriesLimitReached,
),
500,
),
);
test_prune(998, 2, (PruneProgress::Finished, 499));
test_prune(1200, 3, (PruneProgress::Finished, 202));
}
#[test]
fn prune_static_file() {
let db = TestStageDB::default();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=5000,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
let (changesets, _) = random_changeset_range(
&mut rng,
blocks.iter(),
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
1..2,
1..2,
);
db.insert_changesets_to_static_files(changesets.clone(), None)
.expect("insert changesets to static files");
db.insert_history(changesets.clone(), None).expect("insert history");
let storage_occurrences = db.table::<tables::StoragesHistory>().unwrap().into_iter().fold(
BTreeMap::<_, usize>::new(),
|mut map, (key, _)| {
map.entry((key.address, key.sharded_key.key)).or_default().add_assign(1);
map
},
);
assert!(storage_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
let original_shards = db.table::<tables::StoragesHistory>().unwrap();
let test_prune = |to_block: BlockNumber,
run: usize,
expected_result: (PruneProgress, usize)| {
let prune_mode = PruneMode::Before(to_block);
let deleted_entries_limit = 1000;
let mut limiter =
PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
let input = PruneInput {
previous_checkpoint: db
.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::StorageHistory)
.unwrap(),
to_block,
limiter: limiter.clone(),
};
let segment = StorageHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default().with_storage_changesets_in_static_files(true),
);
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
assert_matches!(
result,
SegmentOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);
segment
.save_checkpoint(
&provider,
result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
)
.unwrap();
provider.commit().expect("commit");
let changesets = changesets
.iter()
.enumerate()
.flat_map(|(block_number, changeset)| {
changeset.iter().flat_map(move |(address, _, entries)| {
entries.iter().map(move |entry| (block_number, address, entry))
})
})
.collect::<Vec<_>>();
#[expect(clippy::skip_while_next)]
let pruned = changesets
.iter()
.enumerate()
.skip_while(|(i, (block_number, _, _))| {
*i < deleted_entries_limit / STORAGE_HISTORY_TABLES_TO_PRUNE * run &&
*block_number <= to_block as usize
})
.next()
.map(|(i, _)| i)
.unwrap_or_default();
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
let last_pruned_block_number = pruned_changesets
.next()
.map(|(block_number, _, _)| {
(if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
}) as BlockNumber
})
.unwrap_or(to_block);
let actual_shards = db.table::<tables::StoragesHistory>().unwrap();
let expected_shards = original_shards
.iter()
.filter(|(key, _)| key.sharded_key.highest_block_number > last_pruned_block_number)
.map(|(key, blocks)| {
let new_blocks =
blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
})
.collect::<Vec<_>>();
assert_eq!(actual_shards, expected_shards);
assert_eq!(
db.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::StorageHistory)
.unwrap(),
Some(PruneCheckpoint {
block_number: Some(last_pruned_block_number),
tx_number: None,
prune_mode
})
);
};
test_prune(
998,
1,
(PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 500),
);
test_prune(998, 2, (PruneProgress::Finished, 500));
test_prune(1200, 3, (PruneProgress::Finished, 202));
}
}

View File

@@ -24,9 +24,9 @@ pub enum PruneSegment {
Receipts,
/// Prune segment responsible for some rows in `Receipts` table filtered by logs.
ContractLogs,
/// Prunes account changesets (static files/MDBX) and `AccountsHistory`.
/// Prune segment responsible for the `AccountChangeSets` and `AccountsHistory` tables.
AccountHistory,
/// Prunes storage changesets (static files/MDBX) and `StoragesHistory`.
/// Prune segment responsible for the `StorageChangeSets` and `StoragesHistory` tables.
StorageHistory,
#[deprecated = "Variant indexes cannot be changed"]
#[strum(disabled)]

View File

@@ -122,7 +122,6 @@ impl RethRpcServerConfig for RpcServerArgs {
max_receipts: self.rpc_state_cache.max_receipts,
max_headers: self.rpc_state_cache.max_headers,
max_concurrent_db_requests: self.rpc_state_cache.max_concurrent_db_requests,
max_cached_tx_hashes: self.rpc_state_cache.max_cached_tx_hashes,
}
}

View File

@@ -3,13 +3,6 @@
use std::collections::HashSet;
use tracing::warn;
/// Critical Engine API method prefixes that warrant warnings on capability mismatches.
///
/// These are essential for block production and chain synchronization. Missing support
/// for these methods indicates a significant version mismatch that operators should address.
const CRITICAL_METHOD_PREFIXES: &[&str] =
&["engine_forkchoiceUpdated", "engine_getPayload", "engine_newPayload"];
/// All Engine API capabilities supported by Reth (Ethereum mainnet).
///
/// See <https://github.com/ethereum/execution-apis/tree/main/src/engine> for updates.
@@ -79,52 +72,31 @@ impl EngineCapabilities {
CapabilityMismatches { missing_in_el, missing_in_cl }
}
/// Logs warnings if CL and EL capabilities don't match for critical methods.
/// Logs warnings if CL and EL capabilities don't match.
///
/// Called during `engine_exchangeCapabilities` to warn operators about
/// version mismatches between the consensus layer and execution layer.
///
/// Only warns about critical methods (`engine_forkchoiceUpdated`, `engine_getPayload`,
/// `engine_newPayload`) that are essential for block production and chain synchronization.
/// Non-critical methods like `engine_getBlobs` are not warned about since not all
/// clients support them.
pub fn log_capability_mismatches(&self, cl_capabilities: &[String]) {
let mismatches = self.get_capability_mismatches(cl_capabilities);
let critical_missing_in_el: Vec<_> =
mismatches.missing_in_el.iter().filter(|m| is_critical_method(m)).cloned().collect();
let critical_missing_in_cl: Vec<_> =
mismatches.missing_in_cl.iter().filter(|m| is_critical_method(m)).cloned().collect();
if !critical_missing_in_el.is_empty() {
if !mismatches.missing_in_el.is_empty() {
warn!(
target: "rpc::engine",
missing = ?critical_missing_in_el,
missing = ?mismatches.missing_in_el,
"CL supports Engine API methods that Reth doesn't. Consider upgrading Reth."
);
}
if !critical_missing_in_cl.is_empty() {
if !mismatches.missing_in_cl.is_empty() {
warn!(
target: "rpc::engine",
missing = ?critical_missing_in_cl,
missing = ?mismatches.missing_in_cl,
"Reth supports Engine API methods that CL doesn't. Consider upgrading your consensus client."
);
}
}
}
/// Returns `true` if the method is critical for block production and chain synchronization.
fn is_critical_method(method: &str) -> bool {
CRITICAL_METHOD_PREFIXES.iter().any(|prefix| {
method.starts_with(prefix) &&
method[prefix.len()..]
.strip_prefix('V')
.is_some_and(|s| s.chars().next().is_some_and(|c| c.is_ascii_digit()))
})
}
impl Default for EngineCapabilities {
fn default() -> Self {
Self::new(CAPABILITIES.iter().copied())
@@ -201,20 +173,4 @@ mod tests {
assert_eq!(result.missing_in_el, vec!["a_other", "z_other"]);
assert_eq!(result.missing_in_cl, vec!["a_method", "z_method"]);
}
#[test]
fn test_is_critical_method() {
assert!(is_critical_method("engine_forkchoiceUpdatedV1"));
assert!(is_critical_method("engine_forkchoiceUpdatedV3"));
assert!(is_critical_method("engine_getPayloadV1"));
assert!(is_critical_method("engine_getPayloadV4"));
assert!(is_critical_method("engine_newPayloadV1"));
assert!(is_critical_method("engine_newPayloadV4"));
assert!(!is_critical_method("engine_getBlobsV1"));
assert!(!is_critical_method("engine_getBlobsV3"));
assert!(!is_critical_method("engine_getPayloadBodiesByHashV1"));
assert!(!is_critical_method("engine_getPayloadBodiesByRangeV1"));
assert!(!is_critical_method("engine_getClientVersionV1"));
}
}

View File

@@ -20,8 +20,8 @@ use alloy_rpc_types_eth::{
use futures::Future;
use reth_errors::{ProviderError, RethError};
use reth_evm::{
env::BlockEnvironment, execute::BlockBuilder, ConfigureEvm, Evm, EvmEnvFor, HaltReasonFor,
InspectorFor, TransactionEnv, TxEnvFor,
env::BlockEnvironment, ConfigureEvm, Evm, EvmEnvFor, HaltReasonFor, InspectorFor,
TransactionEnv, TxEnvFor,
};
use reth_node_api::BlockBody;
use reth_primitives_traits::Recovered;
@@ -96,37 +96,7 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
self.spawn_with_state_at_block(block, move |this, mut db| {
let mut blocks: Vec<SimulatedBlock<RpcBlock<Self::NetworkTypes>>> =
Vec::with_capacity(block_state_calls.len());
// Track previous block number and timestamp for validation
let mut prev_block_number = parent.number();
let mut prev_timestamp = parent.timestamp();
for block in block_state_calls {
// Validate block number ordering if overridden
if let Some(number) = block.block_overrides.as_ref().and_then(|o| o.number) {
let number: u64 = number.try_into().unwrap_or(u64::MAX);
if number <= prev_block_number {
return Err(EthApiError::other(EthSimulateError::BlockNumberInvalid {
got: number,
parent: prev_block_number,
})
.into());
}
}
// Validate timestamp ordering if overridden
if let Some(time) = block
.block_overrides
.as_ref()
.and_then(|o| o.time)
.filter(|&t| t <= prev_timestamp)
{
return Err(EthApiError::other(EthSimulateError::BlockTimestampInvalid {
got: time,
parent: prev_timestamp,
})
.into());
}
let mut evm_env = this
.evm_config()
.next_evm_env(&parent, &this.next_env_attributes(&parent)?)
@@ -146,11 +116,6 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
let SimBlock { block_overrides, state_overrides, calls } = block;
// Set prevrandao to zero for simulated blocks by default,
// matching spec behavior where MixDigest is zero-initialized.
// If user provides an override, it will be applied by apply_block_overrides.
evm_env.block_env.inner_mut().prevrandao = Some(B256::ZERO);
if let Some(block_overrides) = block_overrides {
// ensure we don't allow uncapped gas limit per block
if let Some(gas_limit_override) = block_overrides.gas_limit &&
@@ -165,8 +130,8 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
evm_env.block_env.inner_mut(),
);
}
if let Some(ref state_overrides) = state_overrides {
apply_state_overrides(state_overrides.clone(), &mut db)
if let Some(state_overrides) = state_overrides {
apply_state_overrides(state_overrides, &mut db)
.map_err(Self::Error::from_eth_err)?;
}
@@ -187,17 +152,7 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
}
if txs_without_gas_limit > 0 {
// Per spec: "gasLimit: blockGasLimit - soFarUsedGasInBlock"
// Divide remaining gas equally among transactions without gas
let gas_per_tx = (block_gas_limit - total_specified_gas) /
txs_without_gas_limit as u64;
// Cap to RPC gas limit, matching spec behavior
let call_gas_limit = this.call_gas_limit();
if call_gas_limit > 0 {
gas_per_tx.min(call_gas_limit)
} else {
gas_per_tx
}
(block_gas_limit - total_specified_gas) / txs_without_gas_limit as u64
} else {
0
}
@@ -222,16 +177,7 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
let evm = this
.evm_config()
.evm_with_env_and_inspector(&mut db, evm_env, inspector);
let mut builder = this.evm_config().create_block_builder(evm, &parent, ctx);
if let Some(ref state_overrides) = state_overrides {
simulate::apply_precompile_overrides(
state_overrides,
builder.evm_mut().precompiles_mut(),
)
.map_err(|e| Self::Error::from_eth_err(EthApiError::other(e)))?;
}
let builder = this.evm_config().create_block_builder(evm, &parent, ctx);
simulate::execute_transactions(
builder,
calls,
@@ -242,16 +188,7 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
.map_err(map_err)?
} else {
let evm = this.evm_config().evm_with_env(&mut db, evm_env);
let mut builder = this.evm_config().create_block_builder(evm, &parent, ctx);
if let Some(ref state_overrides) = state_overrides {
simulate::apply_precompile_overrides(
state_overrides,
builder.evm_mut().precompiles_mut(),
)
.map_err(|e| Self::Error::from_eth_err(EthApiError::other(e)))?;
}
let builder = this.evm_config().create_block_builder(evm, &parent, ctx);
simulate::execute_transactions(
builder,
calls,
@@ -264,10 +201,6 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
parent = result.block.clone_sealed_header();
// Update tracking for next iteration's validation
prev_block_number = parent.number();
prev_timestamp = parent.timestamp();
let block = simulate::build_simulated_block::<Self::Error, _>(
result.block,
results,

View File

@@ -79,7 +79,7 @@ where
blob_schedule: chain_spec
.blob_params_at_timestamp(timestamp)
// no blob support, so we set this to original cancun values as defined in eip-4844
.unwrap_or_else(BlobParams::cancun),
.unwrap_or(BlobParams::cancun()),
chain_id: chain_spec.chain().id(),
fork_id,
precompiles,

View File

@@ -310,14 +310,12 @@ pub trait Trace: LoadState<Error: FromEvmError<Self::Evm>> + Call {
.evm_factory()
.create_tracer(&mut db, evm_env, inspector_setup())
.try_trace_many(block.transactions_recovered().take(max_transactions), |ctx| {
#[allow(clippy::needless_update)]
let tx_info = TransactionInfo {
hash: Some(*ctx.tx.tx_hash()),
index: Some(idx),
block_hash: Some(block_hash),
block_number: Some(block_number),
base_fee: Some(base_fee),
..Default::default()
};
idx += 1;

View File

@@ -290,14 +290,12 @@ pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
let block_number = block.number();
let base_fee_per_gas = block.base_fee_per_gas();
if let Some((signer, tx)) = block.transactions_with_sender().nth(index) {
#[allow(clippy::needless_update)]
let tx_info = TransactionInfo {
hash: Some(*tx.tx_hash()),
block_hash: Some(block_hash),
block_number: Some(block_number),
base_fee: base_fee_per_gas,
index: Some(index as u64),
..Default::default()
};
return Ok(Some(
@@ -368,14 +366,12 @@ pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
.enumerate()
.find(|(_, (signer, tx))| **signer == sender && (*tx).nonce() == nonce)
.map(|(index, (signer, tx))| {
#[allow(clippy::needless_update)]
let tx_info = TransactionInfo {
hash: Some(*tx.tx_hash()),
block_hash: Some(block_hash),
block_number: Some(block_number),
base_fee: base_fee_per_gas,
index: Some(index as u64),
..Default::default()
};
Ok(self.converter().fill(tx.clone().with_signer(*signer), tx_info)?)
})
@@ -623,20 +619,7 @@ pub trait LoadTransaction: SpawnBlocking + FullEthApiTypes + RpcNodeCoreExt {
Output = Result<Option<TransactionSource<ProviderTx<Self::Provider>>>, Self::Error>,
> + Send {
async move {
// First, try the RPC cache
if let Some(cached) = self.cache().get_transaction_by_hash(hash).await &&
let Some(tx) = cached.recovered_transaction()
{
return Ok(Some(TransactionSource::Block {
transaction: tx.cloned(),
index: cached.tx_index as u64,
block_hash: cached.block.hash(),
block_number: cached.block.number(),
base_fee: cached.block.base_fee_per_gas(),
}));
}
// Cache miss - try to find the transaction on disk
// Try to find the transaction on disk
if let Some((tx, meta)) = self
.spawn_blocking_io(move |this| {
this.provider()

View File

@@ -2,68 +2,15 @@
use std::sync::Arc;
use alloy_consensus::{transaction::TxHashRef, TxReceipt};
use alloy_consensus::TxReceipt;
use alloy_primitives::TxHash;
use reth_primitives_traits::{
Block, BlockBody, BlockTy, IndexedTx, NodePrimitives, ReceiptTy, Recovered, RecoveredBlock,
SealedBlock,
BlockTy, IndexedTx, NodePrimitives, ReceiptTy, RecoveredBlock, SealedBlock,
};
use reth_rpc_convert::{transaction::ConvertReceiptInput, RpcConvert, RpcTypes};
use crate::utils::calculate_gas_used_and_next_log_index;
/// Cached data for a transaction lookup.
#[derive(Debug, Clone)]
pub struct CachedTransaction<B: Block, R> {
/// The block containing this transaction.
pub block: Arc<RecoveredBlock<B>>,
/// Index of the transaction within the block.
pub tx_index: usize,
/// Receipts for the block, if available.
pub receipts: Option<Arc<Vec<R>>>,
}
impl<B: Block, R> CachedTransaction<B, R> {
/// Creates a new cached transaction entry.
pub const fn new(
block: Arc<RecoveredBlock<B>>,
tx_index: usize,
receipts: Option<Arc<Vec<R>>>,
) -> Self {
Self { block, tx_index, receipts }
}
/// Returns the `Recovered<&T>` transaction at the cached index.
pub fn recovered_transaction(&self) -> Option<Recovered<&<B::Body as BlockBody>::Transaction>> {
self.block.recovered_transaction(self.tx_index)
}
/// Converts this cached transaction into an RPC receipt using the given converter.
///
/// Returns `None` if receipts are not available or the transaction index is out of bounds.
pub fn into_receipt<N, C>(
self,
converter: &C,
) -> Option<Result<<C::Network as RpcTypes>::Receipt, C::Error>>
where
N: NodePrimitives<Block = B, Receipt = R>,
R: TxReceipt + Clone,
C: RpcConvert<Primitives = N>,
{
let receipts = self.receipts?;
let receipt = receipts.get(self.tx_index)?;
let tx_hash = *self.block.body().transactions().get(self.tx_index)?.tx_hash();
let tx = self.block.find_indexed(tx_hash)?;
convert_transaction_receipt::<N, C>(
self.block.as_ref(),
receipts.as_ref(),
tx,
receipt,
converter,
)
}
}
/// A pair of an [`Arc`] wrapped [`RecoveredBlock`] and its corresponding receipts.
///
/// This type is used throughout the RPC layer to efficiently pass around

View File

@@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
use reth_rpc_server_types::constants::cache::{
DEFAULT_BLOCK_CACHE_MAX_LEN, DEFAULT_CONCURRENT_DB_REQUESTS, DEFAULT_HEADER_CACHE_MAX_LEN,
DEFAULT_MAX_CACHED_TX_HASHES, DEFAULT_RECEIPT_CACHE_MAX_LEN,
DEFAULT_RECEIPT_CACHE_MAX_LEN,
};
/// Settings for the [`EthStateCache`](super::EthStateCache).
@@ -27,8 +27,6 @@ pub struct EthStateCacheConfig {
///
/// Default is 512.
pub max_concurrent_db_requests: usize,
/// Maximum number of transaction hashes to cache for transaction lookups.
pub max_cached_tx_hashes: u32,
}
impl Default for EthStateCacheConfig {
@@ -38,7 +36,6 @@ impl Default for EthStateCacheConfig {
max_receipts: DEFAULT_RECEIPT_CACHE_MAX_LEN,
max_headers: DEFAULT_HEADER_CACHE_MAX_LEN,
max_concurrent_db_requests: DEFAULT_CONCURRENT_DB_REQUESTS,
max_cached_tx_hashes: DEFAULT_MAX_CACHED_TX_HASHES,
}
}
}

View File

@@ -1,18 +1,17 @@
//! Async caching support for eth RPC
use super::{EthStateCacheConfig, MultiConsumerLruCache};
use crate::block::CachedTransaction;
use alloy_consensus::{transaction::TxHashRef, BlockHeader};
use alloy_consensus::BlockHeader;
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::{TxHash, B256};
use alloy_primitives::B256;
use futures::{stream::FuturesOrdered, Stream, StreamExt};
use reth_chain_state::CanonStateNotification;
use reth_errors::{ProviderError, ProviderResult};
use reth_execution_types::Chain;
use reth_primitives_traits::{Block, BlockBody, NodePrimitives, RecoveredBlock};
use reth_primitives_traits::{Block, NodePrimitives, RecoveredBlock};
use reth_storage_api::{BlockReader, TransactionVariant};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use schnellru::{ByLength, Limiter, LruMap};
use schnellru::{ByLength, Limiter};
use std::{
future::Future,
pin::Pin,
@@ -48,9 +47,6 @@ type HeaderResponseSender<H> = oneshot::Sender<ProviderResult<H>>;
/// The type that can send the response with a chain of cached blocks
type CachedParentBlocksResponseSender<B> = oneshot::Sender<Vec<Arc<RecoveredBlock<B>>>>;
/// The type that can send the response for a transaction hash lookup
type TransactionHashResponseSender<B, R> = oneshot::Sender<Option<CachedTransaction<B, R>>>;
type BlockLruCache<B, L> =
MultiConsumerLruCache<B256, Arc<RecoveredBlock<B>>, L, BlockWithSendersResponseSender<B>>;
@@ -83,13 +79,11 @@ impl<N: NodePrimitives> EthStateCache<N> {
max_receipts: u32,
max_headers: u32,
max_concurrent_db_operations: usize,
max_cached_tx_hashes: u32,
) -> (Self, EthStateCacheService<Provider, Tasks>)
where
Provider: BlockReader<Block = N::Block, Receipt = N::Receipt>,
{
let (to_service, rx) = unbounded_channel();
let service = EthStateCacheService {
provider,
full_block_cache: BlockLruCache::new(max_blocks, "blocks"),
@@ -99,7 +93,6 @@ impl<N: NodePrimitives> EthStateCache<N> {
action_rx: UnboundedReceiverStream::new(rx),
action_task_spawner,
rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_operations)),
tx_hash_index: LruMap::new(ByLength::new(max_cached_tx_hashes)),
};
let cache = Self { to_service };
(cache, service)
@@ -134,7 +127,6 @@ impl<N: NodePrimitives> EthStateCache<N> {
max_receipts,
max_headers,
max_concurrent_db_requests,
max_cached_tx_hashes,
} = config;
let (this, service) = Self::create(
provider,
@@ -143,7 +135,6 @@ impl<N: NodePrimitives> EthStateCache<N> {
max_receipts,
max_headers,
max_concurrent_db_requests,
max_cached_tx_hashes,
);
executor.spawn_critical("eth state cache", Box::pin(service));
this
@@ -264,19 +255,6 @@ impl<N: NodePrimitives> EthStateCache<N> {
Some(blocks)
}
}
/// Looks up a transaction by its hash in the cache index.
///
/// Returns the cached block, transaction index, and optionally receipts if the transaction
/// is in a cached block.
pub async fn get_transaction_by_hash(
&self,
tx_hash: TxHash,
) -> Option<CachedTransaction<N::Block, N::Receipt>> {
let (response_tx, rx) = oneshot::channel();
let _ = self.to_service.send(CacheAction::GetTransactionByHash { tx_hash, response_tx });
rx.await.ok()?
}
}
/// Thrown when the cache service task dropped.
#[derive(Debug, thiserror::Error)]
@@ -339,8 +317,6 @@ pub(crate) struct EthStateCacheService<
///
/// This restricts the max concurrent fetch tasks at the same time.
rate_limiter: Arc<Semaphore>,
/// LRU index mapping transaction hashes to their block hash and index within the block.
tx_hash_index: LruMap<TxHash, (B256, usize), ByLength>,
}
impl<Provider, Tasks> EthStateCacheService<Provider, Tasks>
@@ -348,29 +324,6 @@ where
Provider: BlockReader + Clone + Unpin + 'static,
Tasks: TaskSpawner + Clone + 'static,
{
/// Indexes all transactions in a block by transaction hash.
fn index_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
let block_hash = block.hash();
for (tx_idx, tx) in block.body().transactions().iter().enumerate() {
self.tx_hash_index.insert(*tx.tx_hash(), (block_hash, tx_idx));
}
}
/// Removes transaction index entries for a reorged block.
///
/// Only removes entries that still point to this block, preserving mappings for transactions
/// that were re-mined in a new canonical block.
fn remove_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
let block_hash = block.hash();
for tx in block.body().transactions() {
if let Some((mapped_hash, _)) = self.tx_hash_index.get(tx.tx_hash()) &&
*mapped_hash == block_hash
{
self.tx_hash_index.remove(tx.tx_hash());
}
}
}
fn on_new_block(
&mut self,
block_hash: B256,
@@ -597,8 +550,6 @@ where
}
CacheAction::CacheNewCanonicalChain { chain_change } => {
for block in chain_change.blocks {
// Index transactions before caching the block
this.index_block_transactions(&block);
this.on_new_block(block.hash(), Ok(Some(Arc::new(block))));
}
@@ -611,8 +562,6 @@ where
}
CacheAction::RemoveReorgedChain { chain_change } => {
for block in chain_change.blocks {
// Remove transaction index entries for reorged blocks
this.remove_block_transactions(&block);
this.on_reorg_block(block.hash(), Ok(Some(block)));
}
@@ -647,15 +596,6 @@ where
let _ = response_tx.send(blocks);
}
CacheAction::GetTransactionByHash { tx_hash, response_tx } => {
let result =
this.tx_hash_index.get(&tx_hash).and_then(|(block_hash, idx)| {
let block = this.full_block_cache.get(block_hash).cloned()?;
let receipts = this.receipts_cache.get(block_hash).cloned();
Some(CachedTransaction::new(block, *idx, receipts))
});
let _ = response_tx.send(result);
}
};
this.update_cached_metrics();
}
@@ -709,11 +649,6 @@ enum CacheAction<B: Block, R> {
max_blocks: usize,
response_tx: CachedParentBlocksResponseSender<B>,
},
/// Look up a transaction's cached data by its hash
GetTransactionByHash {
tx_hash: TxHash,
response_tx: TransactionHashResponseSender<B, R>,
},
}
struct BlockReceipts<R> {

View File

@@ -27,7 +27,6 @@ pub mod tx_forward;
pub mod utils;
pub use alloy_rpc_types_eth::FillTransaction;
pub use block::CachedTransaction;
pub use builder::config::{EthConfig, EthFilterConfig};
pub use cache::{
config::EthStateCacheConfig, db::StateCacheDb, multi_consumer::MultiConsumerLruCache,

View File

@@ -6,11 +6,9 @@ use crate::{
};
use alloy_consensus::{transaction::TxHashRef, BlockHeader, Transaction as _};
use alloy_eips::eip2718::WithEncoded;
use alloy_evm::precompiles::PrecompilesMap;
use alloy_network::TransactionBuilder;
use alloy_rpc_types_eth::{
simulate::{SimCallResult, SimulateError, SimulatedBlock},
state::StateOverride,
BlockTransactionsKind,
};
use jsonrpsee_types::ErrorObject;
@@ -29,16 +27,6 @@ use revm::{
Database,
};
/// Error code for execution reverted in `eth_simulateV1`.
///
/// <https://github.com/ethereum/execution-apis>
pub const SIMULATE_REVERT_CODE: i32 = -32000;
/// Error code for VM execution errors (e.g., out of gas) in `eth_simulateV1`.
///
/// <https://github.com/ethereum/execution-apis>
pub const SIMULATE_VM_ERROR_CODE: i32 = -32015;
/// Errors which may occur during `eth_simulateV1` execution.
#[derive(Debug, thiserror::Error)]
pub enum EthSimulateError {
@@ -49,21 +37,11 @@ pub enum EthSimulateError {
#[error("Client adjustable limit reached")]
GasLimitReached,
/// Block number in sequence did not increase.
#[error("block numbers must be in order: {got} <= {parent}")]
BlockNumberInvalid {
/// The block number that was provided.
got: u64,
/// The parent block number.
parent: u64,
},
/// Block timestamp in sequence did not increase.
#[error("block timestamps must be in order: {got} <= {parent}")]
BlockTimestampInvalid {
/// The block timestamp that was provided.
got: u64,
/// The parent block timestamp.
parent: u64,
},
#[error("Block number in sequence did not increase")]
BlockNumberInvalid,
/// Block timestamp in sequence did not increase or stay the same.
#[error("Block timestamp in sequence did not increase")]
BlockTimestampInvalid,
/// Transaction nonce is too low.
#[error("nonce too low: next nonce {state}, tx nonce {tx}")]
NonceTooLow {
@@ -101,9 +79,6 @@ pub enum EthSimulateError {
/// Multiple `MovePrecompileToAddress` referencing the same address.
#[error("Multiple MovePrecompileToAddress referencing the same address")]
PrecompileDuplicateAddress,
/// Attempted to move a non-precompile address.
#[error("account {0} is not a precompile")]
NotAPrecompile(Address),
}
impl EthSimulateError {
@@ -116,14 +91,13 @@ impl EthSimulateError {
Self::IntrinsicGasTooLow => -38013,
Self::InsufficientFunds { .. } => -38014,
Self::BlockGasLimitExceeded => -38015,
Self::BlockNumberInvalid { .. } => -38020,
Self::BlockTimestampInvalid { .. } => -38021,
Self::BlockNumberInvalid => -38020,
Self::BlockTimestampInvalid => -38021,
Self::PrecompileSelfReference => -38022,
Self::PrecompileDuplicateAddress => -38023,
Self::SenderNotEOA => -38024,
Self::MaxInitCodeSizeExceeded => -38025,
Self::GasLimitReached => -38026,
Self::NotAPrecompile(_) => -32000,
}
}
}
@@ -134,76 +108,6 @@ impl ToRpcError for EthSimulateError {
}
}
/// Applies precompile move overrides from state overrides to the EVM's precompiles map.
///
/// This function processes `movePrecompileToAddress` entries from the state overrides and
/// moves precompiles from their original addresses to new addresses. The original address
/// is cleared (precompile removed) and the precompile is installed at the destination address.
///
/// # Validation
///
/// - The source address must be a precompile (exists in the precompiles map)
/// - Moving multiple precompiles to the same destination is allowed
/// - Self-references (moving to the same address) are not explicitly forbidden here since that
/// would be a no-op
///
/// # Arguments
///
/// * `state_overrides` - The state overrides containing potential `movePrecompileToAddress` entries
/// * `precompiles` - Mutable reference to the EVM's precompiles map
///
/// # Returns
///
/// Returns `Ok(())` on success, or an `EthSimulateError::NotAPrecompile` if a source address
/// is not a precompile.
pub fn apply_precompile_overrides(
state_overrides: &StateOverride,
precompiles: &mut PrecompilesMap,
) -> Result<(), EthSimulateError> {
use alloy_evm::precompiles::DynPrecompile;
let moves: Vec<_> = state_overrides
.iter()
.filter_map(|(source, account_override)| {
account_override.move_precompile_to.map(|dest| (*source, dest))
})
.collect();
if moves.is_empty() {
return Ok(());
}
for (source, _) in &moves {
if precompiles.get(source).is_none() {
return Err(EthSimulateError::NotAPrecompile(*source));
}
}
let mut extracted: Vec<(Address, Address, DynPrecompile)> = Vec::with_capacity(moves.len());
for (source, dest) in moves {
if source == dest {
continue;
}
let mut found_precompile: Option<DynPrecompile> = None;
precompiles.apply_precompile(&source, |existing| {
found_precompile = existing;
None
});
if let Some(precompile) = found_precompile {
extracted.push((source, dest, precompile));
}
}
for (_, dest, precompile) in extracted {
precompiles.apply_precompile(&dest, |_| Some(precompile));
}
Ok(())
}
/// Converts all [`TransactionRequest`]s into [`Recovered`] transactions and applies them to the
/// given [`BlockExecutor`].
///
@@ -359,7 +263,7 @@ where
return_data: Bytes::new(),
error: Some(SimulateError {
message: error.to_string(),
code: SIMULATE_VM_ERROR_CODE,
code: error.into().code(),
..SimulateError::invalid_params()
}),
gas_used,
@@ -374,7 +278,7 @@ where
return_data: output,
error: Some(SimulateError {
message: error.to_string(),
code: SIMULATE_REVERT_CODE,
code: error.into().code(),
..SimulateError::invalid_params()
}),
gas_used,
@@ -395,7 +299,6 @@ where
log_index: Some(log_index - 1),
transaction_index: Some(index as u64),
transaction_hash: Some(*tx.tx_hash()),
block_hash: Some(block.hash()),
block_number: Some(block.header().number()),
block_timestamp: Some(block.header().timestamp()),
..Default::default()

View File

@@ -49,14 +49,12 @@ impl<T: SignedTransaction> TransactionSource<T> {
match self {
Self::Pool(tx) => resp_builder.fill_pending(tx),
Self::Block { transaction, index, block_hash, block_number, base_fee } => {
#[allow(clippy::needless_update)]
let tx_info = TransactionInfo {
hash: Some(transaction.trie_hash()),
index: Some(index),
block_hash: Some(block_hash),
block_number: Some(block_number),
base_fee,
..Default::default()
};
resp_builder.fill(transaction, tx_info)
@@ -71,7 +69,6 @@ impl<T: SignedTransaction> TransactionSource<T> {
let hash = tx.trie_hash();
(tx, TransactionInfo { hash: Some(hash), ..Default::default() })
}
#[allow(clippy::needless_update)]
Self::Block { transaction, index, block_hash, block_number, base_fee } => {
let hash = transaction.trie_hash();
(
@@ -82,7 +79,6 @@ impl<T: SignedTransaction> TransactionSource<T> {
block_hash: Some(block_hash),
block_number: Some(block_number),
base_fee,
..Default::default()
},
)
}

View File

@@ -132,7 +132,4 @@ pub mod cache {
/// Default number of concurrent database requests.
pub const DEFAULT_CONCURRENT_DB_REQUESTS: usize = 512;
/// Default maximum number of transaction hashes to cache for lookups.
pub const DEFAULT_MAX_CACHED_TX_HASHES: u32 = 30_000;
}

View File

@@ -99,7 +99,7 @@ impl AccountHashingStage {
// Account State generator
let mut account_cursor =
provider.tx_ref().cursor_write::<tables::PlainAccountState>()?;
accounts.sort_by_key(|a| a.0);
accounts.sort_by(|a, b| a.0.cmp(&b.0));
for (addr, acc) in &accounts {
account_cursor.append(*addr, acc)?;
}

View File

@@ -1,8 +1,6 @@
use super::collect_account_history_indices;
use crate::stages::utils::{collect_history_indices, load_account_history};
use reth_config::config::{EtlConfig, IndexHistoryConfig};
#[cfg(all(unix, feature = "rocksdb"))]
use reth_db_api::Tables;
use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut};
use reth_provider::{
DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
@@ -113,6 +111,7 @@ where
// but this is safe for first_sync because if we crash before commit, the
// checkpoint stays at 0 and we'll just clear and rebuild again on restart. The
// source data (changesets) is intact.
#[cfg(all(unix, feature = "rocksdb"))]
provider.rocksdb_provider().clear::<tables::AccountsHistory>()?;
} else {
provider.tx_ref().clear::<tables::AccountsHistory>()?;
@@ -144,12 +143,6 @@ where
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
#[cfg(all(unix, feature = "rocksdb"))]
if use_rocksdb {
provider.commit_pending_rocksdb_batches()?;
provider.rocksdb_provider().flush(&[Tables::AccountsHistory.name()])?;
}
Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true })
}

View File

@@ -1,8 +1,6 @@
use super::{collect_history_indices, collect_storage_history_indices};
use crate::{stages::utils::load_storage_history, StageCheckpoint, StageId};
use reth_config::config::{EtlConfig, IndexHistoryConfig};
#[cfg(all(unix, feature = "rocksdb"))]
use reth_db_api::Tables;
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, AddressStorageKey, BlockNumberAddress},
tables,
@@ -117,6 +115,7 @@ where
// but this is safe for first_sync because if we crash before commit, the
// checkpoint stays at 0 and we'll just clear and rebuild again on restart. The
// source data (changesets) is intact.
#[cfg(all(unix, feature = "rocksdb"))]
provider.rocksdb_provider().clear::<tables::StoragesHistory>()?;
} else {
provider.tx_ref().clear::<tables::StoragesHistory>()?;
@@ -148,12 +147,6 @@ where
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
#[cfg(all(unix, feature = "rocksdb"))]
if use_rocksdb {
provider.commit_pending_rocksdb_batches()?;
provider.rocksdb_provider().flush(&[Tables::StoragesHistory.name()])?;
}
Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true })
}

View File

@@ -10,7 +10,6 @@ use reth_prune::{
use reth_stages_api::{
ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
};
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
use tracing::info;
/// The prune stage that runs the pruner with the provided prune modes.
@@ -47,9 +46,7 @@ where
+ StageCheckpointReader
+ StaticFileProviderFactory<
Primitives: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>,
> + StorageSettingsCache
+ ChangeSetReader
+ StorageChangeSetReader,
> + StorageSettingsCache,
{
fn id(&self) -> StageId {
StageId::Prune
@@ -154,9 +151,7 @@ where
+ StageCheckpointReader
+ StaticFileProviderFactory<
Primitives: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>,
> + StorageSettingsCache
+ ChangeSetReader
+ StorageChangeSetReader,
> + StorageSettingsCache,
{
fn id(&self) -> StageId {
StageId::PruneSenderRecovery

Some files were not shown because too many files have changed in this diff Show More