Compare commits

..

62 Commits

Author SHA1 Message Date
Georgios Konstantopoulos
4bcd6e03db feat(trie): add trie_nodes_batch API to TrieNodeProvider trait
Adds batch node fetching capability to reduce O(n²) proof walks when
revealing multiple blinded nodes. Default implementation falls back to
sequential fetching.

Amp-Thread-ID: https://ampcode.com/threads/T-019bfe25-43f3-75ac-98f7-32bf937b69e1
Co-authored-by: Amp <amp@ampcode.com>
2026-01-27 07:34:41 +00:00
YK
11d9f38077 test(e2e): comprehensive RocksDB storage E2E tests (#21423) 2026-01-27 07:08:57 +00:00
Matthias Seitz
226ce14ca1 perf(trie): use is_zero() check to avoid copy in is_storage_empty (#21459)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-27 00:42:26 +00:00
Dan Cline
a6e1dea2d7 chore: add logging for internal fcu errors (#21456) 2026-01-26 23:24:48 +00:00
Georgios Konstantopoulos
71ed68e944 perf(db): flatten HashedPostState before persisting (#21422) 2026-01-26 22:49:01 +00:00
DaniPopes
adecbd7814 chore: log docker sccache stats (#21455) 2026-01-26 22:30:20 +00:00
Matthias Seitz
26a37f3c00 chore: use Default::default() for TransactionInfo for forward compatibility (#21454)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-26 22:15:41 +00:00
DaniPopes
0bfa7fa5fa ci: typorino (#21453) 2026-01-26 21:39:35 +00:00
Georgios Konstantopoulos
18bec10a0b perf(docker): use shared cache mounts for parallel builds (#21451)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-26 21:00:45 +00:00
DaniPopes
1e33821e19 ci: use depot cache in Dockerfile.depot (#21450) 2026-01-26 20:37:33 +00:00
ethfanWilliam
da92733be8 fix: use unwrap_or_else for lazy evaluation of BlobParams::cancun (#21442) 2026-01-26 20:19:28 +00:00
DaniPopes
c41c8e6cae chore: reduce number of nightly builds (#21446) 2026-01-26 20:06:09 +00:00
DaniPopes
1ccc174e7b chore: remove unused docker from makefile (#21445) 2026-01-26 19:53:55 +00:00
ethfanWilliam
f1459fcf91 fix(stages): retain RocksDB TempDir in TestStageDB to prevent premature deletion (#21444) 2026-01-26 19:43:11 +00:00
Dan Cline
94235d64a8 fix(pruner): prune account and storage changeset static files (#21346) 2026-01-26 19:28:18 +00:00
Dan Cline
7fe60017cf chore(metrics): add a gas_last metric similar to new_payload_last (#21437) 2026-01-26 17:54:20 +00:00
Brian Picciano
f9ec2fafa0 refactor(trie): always use ParallelSparseTrie, deprecate config flags (#21435) 2026-01-26 17:02:06 +00:00
Arsenii Kulikov
768a687189 perf: use shared channel for prewarm workers (#21429) 2026-01-26 15:49:44 +00:00
Rez
b87cde5479 feat: configurable EVM execution limits (#21088)
Co-authored-by: Arsenii Kulikov <klkvrr@gmail.com>
2026-01-26 15:27:09 +00:00
figtracer
ab685579f0 feat(rpc): add transaction hash caching to EthStateCache (#21180)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Amp <amp@ampcode.com>
2026-01-26 14:37:53 +00:00
Matthias Seitz
c7faafd183 fix(rpc): add block timestamp validation in eth_simulateV1 (#21397)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-26 14:12:28 +00:00
Matthias Seitz
935a2cc056 fix(rpc): use correct error codes for eth_simulateV1 reverts and halts (#21412)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-26 14:06:38 +00:00
Matthias Seitz
507cf58db0 fix(rpc): add block number validation in eth_simulateV1 (#21396)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-26 13:47:20 +00:00
Matthias Seitz
6cfd369d17 fix(rpc): populate block_hash in eth_simulateV1 logs (#21413)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-26 12:41:19 +00:00
Matthias Seitz
934f462d01 feat(cli): make stopping on invalid block the default for reth import (#21403)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-26 12:41:06 +00:00
Matthias Seitz
d4f28b02ff feat(rpc): implement movePrecompileToAddress for eth_simulateV1 (#21414)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-26 12:40:12 +00:00
Matthias Seitz
963bfeeeed fix(rpc): set prevrandao to zero for eth_simulateV1 simulated blocks (#21399)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-26 12:39:37 +00:00
Matthias Seitz
adbe6d9da0 fix(rpc): cap simulate_v1 default gas limit to RPC gas cap (#21402)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-26 12:39:15 +00:00
Matthias Seitz
6d19c0ed8e fix(engine): only warn for critical capability mismatches (#21398)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-26 12:36:49 +00:00
Andrey Kolishchak
4baf2baec4 fix(net): FetchFullBlockRangeFuture can get stuck forever after partial body fetch + error (#21411)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Amp <amp@ampcode.com>
2026-01-26 12:34:07 +00:00
emmmm
0b5f79e8c9 docs(rpc): add reth_subscribePersistedBlock method (#21420) 2026-01-26 10:48:35 +00:00
Georgios Konstantopoulos
afe164baca test: add E2E test for RocksDB provider functionality (#21419)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: yongkangc <chiayongkang@hotmail.com>
2026-01-26 10:24:10 +00:00
Hwangjae Lee
31fdbe914c docs(tracing): fix incorrect example description in lib.rs (#21417)
Signed-off-by: Hwangjae Lee <meetrick@gmail.com>
2026-01-26 10:19:36 +00:00
Ahsen Kamal
6870747246 feat(payload): add fn for system transaction check (#21407)
Signed-off-by: Ahsen Kamal <itsahsenkamal@gmail.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Amp <amp@ampcode.com>
2026-01-25 14:47:22 +00:00
Fallengirl
0ad8c772e1 fix(era-utils): export correct era1 CompressedBody payload (#21409) 2026-01-25 14:36:24 +00:00
github-actions[bot]
5440d0d89a chore(deps): weekly cargo update (#21406)
Co-authored-by: github-merge-queue <118344674+github-merge-queue@users.noreply.github.com>
2026-01-25 10:39:48 +00:00
Matthias Seitz
0eea4d76e9 chore: remove unused imports in storage-api (#21400)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-24 15:49:21 +00:00
YK
8a1702cd74 fix(rocksdb): filter history writes to only changed accounts/storage (#21339)
Co-authored-by: Tempo AI <ai@tempo.xyz>
2026-01-24 13:07:16 +00:00
cui
7feb56d5f6 feat: prealloc vec (#21391) 2026-01-24 11:30:34 +00:00
cui
0aa922c4e8 feat: change from stable sort to unstable sort (#21387) 2026-01-24 11:21:47 +00:00
Matthias Seitz
ccff9a08f0 chore: fix clippy unnecessary_sort_by lint (#21385) 2026-01-24 03:13:49 +00:00
Matthias Seitz
eb788cc7cf fix(docker): pass vergen git vars as build args (#21384)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-24 03:21:43 +01:00
Dan Cline
fb05a0654f fix(engine): use LazyTrieData::deferred for chain notification (#21383)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-01-23 22:32:21 +00:00
ethfanWilliam
d5a36dcc00 perf(trie): parallelize merge_ancestors_into_overlay extend ops (#21379) 2026-01-23 22:26:07 +00:00
iPLAY888
ffbef9e3cd chore: removed needless collect (#21381) 2026-01-23 21:59:19 +00:00
Dan Cline
820c112e8e feat(engine): add metric for forkchoiceUpdated response -> newPayload (#21380) 2026-01-23 21:57:15 +00:00
Alexey Shekhirin
9285f7eafc ci: use depot for docker (#20380)
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
2026-01-23 21:14:55 +00:00
joshieDo
9a4c6d8a11 feat(rocksdb): static file based healing for TransactionHashNumbers (#21343) 2026-01-23 20:11:47 +00:00
Dan Cline
963c26550a fix(trie): only clone required keys in on_prefetch_proofs (#21378) 2026-01-23 21:13:01 +01:00
joshieDo
3648483512 feat(rocksdb): add WAL size tracking metric and Grafana dashboard (#21295)
Co-authored-by: Amp <amp@ampcode.com>
2026-01-23 19:59:10 +00:00
joshieDo
ab418642b4 fix(stages): commit RocksDB batches before flush and configure immediate WAL cleanup (#21374)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
2026-01-23 19:28:52 +00:00
joshieDo
decb56fae1 feat(rocksdb): changeset-based crash recovery healing for history indices (#21341) 2026-01-23 19:28:10 +00:00
Matthias Seitz
ee1ec8f9f0 perf(trie): parallelize COW extend operations with rayon (#21375) 2026-01-23 19:31:04 +01:00
Georgios Konstantopoulos
d7bf87da52 feat(engine): add metric for state root task fallback success (#21371) 2026-01-23 18:21:44 +00:00
Georgios Konstantopoulos
dd0c6d279f revert: perf(trie): parallelize merge_ancestors_into_overlay (#21202) (#21370) 2026-01-23 19:09:19 +01:00
Alexey Shekhirin
c137ed836f perf(engine): fixed-cache for execution cache (#21128)
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
Co-authored-by: Tempo AI <ai@tempo.xyz>
2026-01-23 17:57:42 +00:00
Dan Cline
a543752f7d chore(reth-bench): make from-block a required flag (#21372) 2026-01-23 17:52:33 +00:00
joshieDo
b814893221 feat(stages): flush RocksDB at end of history and tx_lookup stages (#21367) 2026-01-23 17:02:53 +00:00
Georgios Konstantopoulos
fcef82261d fix(libmdbx): handle errors gracefully in TransactionInner::drop (#21368) 2026-01-23 16:37:15 +00:00
iPLAY888
d3846d98a9 refactor: refactor get_idle_peer_for to use Iterator::find (#21321) 2026-01-23 15:56:09 +00:00
Alexey Shekhirin
1f536cce65 test(e2e): selfdestruct pre- and post-Dencun (#21363) 2026-01-23 15:41:08 +00:00
Matthias Seitz
0ddaf1b26c feat(engine): add BAL metrics type for EIP-7928 (#21356) 2026-01-23 15:17:33 +00:00
142 changed files with 5464 additions and 3057 deletions

View File

@@ -1,54 +0,0 @@
# Publishes the Docker image, only to be used with `workflow_dispatch`. The
# images from this workflow will be tagged with the git sha of the branch used
# and will NOT tag it as `latest`.
name: docker-git
on:
workflow_dispatch: {}
env:
REPO_NAME: ${{ github.repository_owner }}/reth
IMAGE_NAME: ${{ github.repository_owner }}/reth
OP_IMAGE_NAME: ${{ github.repository_owner }}/op-reth
CARGO_TERM_COLOR: always
DOCKER_IMAGE_NAME: ghcr.io/${{ github.repository_owner }}/reth
OP_DOCKER_IMAGE_NAME: ghcr.io/${{ github.repository_owner }}/op-reth
DOCKER_USERNAME: ${{ github.actor }}
GIT_SHA: ${{ github.sha }}
jobs:
build:
name: build and push
runs-on: ubuntu-24.04
permissions:
packages: write
contents: read
strategy:
fail-fast: false
matrix:
build:
- name: 'Build and push the git-sha-tagged reth image'
command: 'make PROFILE=maxperf GIT_SHA=$GIT_SHA docker-build-push-git-sha'
- name: 'Build and push the git-sha-tagged op-reth image'
command: 'make IMAGE_NAME=$OP_IMAGE_NAME DOCKER_IMAGE_NAME=$OP_DOCKER_IMAGE_NAME GIT_SHA=$GIT_SHA PROFILE=maxperf op-docker-build-push-git-sha'
steps:
- uses: actions/checkout@v6
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
- name: Install cross main
id: cross_main
run: |
cargo install cross --git https://github.com/cross-rs/cross
- name: Log in to Docker
run: |
echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io --username ${DOCKER_USERNAME} --password-stdin
- name: Set up Docker builder
run: |
docker run --privileged --rm tonistiigi/binfmt --install arm64,amd64
docker buildx create --use --name cross-builder
- name: Build and push ${{ matrix.build.name }}
run: ${{ matrix.build.command }}

View File

@@ -1,65 +0,0 @@
# Publishes the nightly Docker image.
name: docker-nightly
on:
workflow_dispatch:
schedule:
- cron: "0 1 * * *"
env:
REPO_NAME: ${{ github.repository_owner }}/reth
IMAGE_NAME: ${{ github.repository_owner }}/reth
OP_IMAGE_NAME: ${{ github.repository_owner }}/op-reth
CARGO_TERM_COLOR: always
DOCKER_IMAGE_NAME: ghcr.io/${{ github.repository_owner }}/reth
OP_DOCKER_IMAGE_NAME: ghcr.io/${{ github.repository_owner }}/op-reth
DOCKER_USERNAME: ${{ github.actor }}
jobs:
build:
name: build and push
runs-on: ubuntu-24.04
permissions:
packages: write
contents: read
strategy:
fail-fast: false
matrix:
build:
- name: 'Build and push the nightly reth image'
command: 'make PROFILE=maxperf docker-build-push-nightly'
- name: 'Build and push the nightly edge profiling reth image'
command: 'make PROFILE=profiling docker-build-push-nightly-edge-profiling'
- name: 'Build and push the nightly profiling reth image'
command: 'make PROFILE=profiling docker-build-push-nightly-profiling'
- name: 'Build and push the nightly op-reth image'
command: 'make IMAGE_NAME=$OP_IMAGE_NAME DOCKER_IMAGE_NAME=$OP_DOCKER_IMAGE_NAME PROFILE=maxperf op-docker-build-push-nightly'
- name: 'Build and push the nightly edge profiling op-reth image'
command: 'make IMAGE_NAME=$OP_IMAGE_NAME DOCKER_IMAGE_NAME=$OP_DOCKER_IMAGE_NAME PROFILE=profiling op-docker-build-push-nightly-edge-profiling'
- name: 'Build and push the nightly profiling op-reth image'
command: 'make IMAGE_NAME=$OP_IMAGE_NAME DOCKER_IMAGE_NAME=$OP_DOCKER_IMAGE_NAME PROFILE=profiling op-docker-build-push-nightly-profiling'
steps:
- uses: actions/checkout@v6
- name: Remove bloatware
uses: laverdet/remove-bloatware@v1.0.0
with:
docker: true
lang: rust
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
- name: Install cross main
id: cross_main
run: |
cargo install cross --git https://github.com/cross-rs/cross
- name: Log in to Docker
run: |
echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io --username ${DOCKER_USERNAME} --password-stdin
- name: Set up Docker builder
run: |
docker run --privileged --rm tonistiigi/binfmt --install arm64,amd64
docker buildx create --use --name cross-builder
- name: Build and push ${{ matrix.build.name }}
run: ${{ matrix.build.command }}

View File

@@ -1,4 +1,9 @@
# Publishes the Docker image.
# Publishes Docker images.
#
# Triggers:
# - Push tag v*: builds release (RC or latest)
# - Schedule: builds nightly + profiling
# - Manual: builds git-sha or nightly
name: docker
@@ -6,84 +11,94 @@ on:
push:
tags:
- v*
env:
IMAGE_NAME: ${{ github.repository_owner }}/reth
OP_IMAGE_NAME: ${{ github.repository_owner }}/op-reth
CARGO_TERM_COLOR: always
DOCKER_IMAGE_NAME: ghcr.io/${{ github.repository_owner }}/reth
OP_DOCKER_IMAGE_NAME: ghcr.io/${{ github.repository_owner }}/op-reth
DOCKER_USERNAME: ${{ github.actor }}
schedule:
- cron: "0 1 * * *"
workflow_dispatch:
inputs:
build_type:
description: "Build type"
required: true
type: choice
options:
- git-sha
- nightly
default: git-sha
dry_run:
description: "Skip pushing images (dry run)"
required: false
type: boolean
default: false
jobs:
build-rc:
if: contains(github.ref, '-rc')
name: build and push as release candidate
runs-on: ubuntu-24.04
permissions:
packages: write
contents: read
strategy:
fail-fast: false
matrix:
build:
- name: "Build and push reth image"
command: "make IMAGE_NAME=$IMAGE_NAME DOCKER_IMAGE_NAME=$DOCKER_IMAGE_NAME PROFILE=maxperf docker-build-push"
- name: "Build and push op-reth image"
command: "make IMAGE_NAME=$OP_IMAGE_NAME DOCKER_IMAGE_NAME=$OP_DOCKER_IMAGE_NAME PROFILE=maxperf op-docker-build-push"
steps:
- uses: actions/checkout@v6
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
- name: Install cross main
id: cross_main
run: |
cargo install cross --git https://github.com/cross-rs/cross
- name: Log in to Docker
run: |
echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io --username ${DOCKER_USERNAME} --password-stdin
- name: Set up Docker builder
run: |
docker run --privileged --rm tonistiigi/binfmt --install arm64,amd64
docker buildx create --use --name cross-builder
- name: Build and push ${{ matrix.build.name }}
run: ${{ matrix.build.command }}
build:
if: ${{ !contains(github.ref, '-rc') }}
name: build and push as latest
name: Build Docker images
runs-on: ubuntu-24.04
permissions:
packages: write
contents: read
strategy:
fail-fast: false
matrix:
build:
- name: "Build and push reth image"
command: "make IMAGE_NAME=$IMAGE_NAME DOCKER_IMAGE_NAME=$DOCKER_IMAGE_NAME PROFILE=maxperf docker-build-push-latest"
- name: "Build and push op-reth image"
command: "make IMAGE_NAME=$OP_IMAGE_NAME DOCKER_IMAGE_NAME=$OP_DOCKER_IMAGE_NAME PROFILE=maxperf op-docker-build-push-latest"
id-token: write
steps:
- uses: actions/checkout@v6
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
- name: Set up Depot CLI
uses: depot/setup-action@v1
- name: Log in to GHCR
uses: docker/login-action@v3
with:
cache-on-failure: true
- name: Install cross main
id: cross_main
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Get git info for vergen
id: git
run: |
cargo install cross --git https://github.com/cross-rs/cross
- name: Log in to Docker
echo "sha=${{ github.sha }}" >> "$GITHUB_OUTPUT"
echo "describe=$(git describe --always --tags)" >> "$GITHUB_OUTPUT"
echo "dirty=false" >> "$GITHUB_OUTPUT"
- name: Determine build parameters
id: params
run: |
echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io --username ${DOCKER_USERNAME} --password-stdin
- name: Set up Docker builder
run: |
docker run --privileged --rm tonistiigi/binfmt --install arm64,amd64
docker buildx create --use --name cross-builder
- name: Build and push ${{ matrix.build.name }}
run: ${{ matrix.build.command }}
REGISTRY="ghcr.io/${{ github.repository_owner }}"
if [[ "${{ github.event_name }}" == "push" ]]; then
VERSION="${GITHUB_REF#refs/tags/}"
echo "targets=ethereum optimism" >> "$GITHUB_OUTPUT"
# Add 'latest' tag for non-RC releases
if [[ ! "$VERSION" =~ -rc ]]; then
echo "ethereum_tags=${REGISTRY}/reth:${VERSION},${REGISTRY}/reth:latest" >> "$GITHUB_OUTPUT"
echo "optimism_tags=${REGISTRY}/op-reth:${VERSION},${REGISTRY}/op-reth:latest" >> "$GITHUB_OUTPUT"
else
echo "ethereum_tags=${REGISTRY}/reth:${VERSION}" >> "$GITHUB_OUTPUT"
echo "optimism_tags=${REGISTRY}/op-reth:${VERSION}" >> "$GITHUB_OUTPUT"
fi
elif [[ "${{ github.event_name }}" == "schedule" ]] || [[ "${{ inputs.build_type }}" == "nightly" ]]; then
echo "targets=nightly" >> "$GITHUB_OUTPUT"
echo "ethereum_tags=${REGISTRY}/reth:nightly" >> "$GITHUB_OUTPUT"
echo "optimism_tags=${REGISTRY}/op-reth:nightly" >> "$GITHUB_OUTPUT"
else
# git-sha build
echo "targets=ethereum optimism" >> "$GITHUB_OUTPUT"
echo "ethereum_tags=${REGISTRY}/reth:${{ github.sha }}" >> "$GITHUB_OUTPUT"
echo "optimism_tags=${REGISTRY}/op-reth:${{ github.sha }}" >> "$GITHUB_OUTPUT"
fi
- name: Build and push images
uses: depot/bake-action@v1
env:
VERGEN_GIT_SHA: ${{ steps.git.outputs.sha }}
VERGEN_GIT_DESCRIBE: ${{ steps.git.outputs.describe }}
VERGEN_GIT_DIRTY: ${{ steps.git.outputs.dirty }}
DEPOT_TOKEN: ${{ secrets.DEPOT_TOKEN }}
with:
project: ${{ vars.DEPOT_PROJECT_ID }}
files: docker-bake.hcl
targets: ${{ steps.params.outputs.targets }}
push: ${{ !(github.event_name == 'workflow_dispatch' && inputs.dry_run) }}
set: |
ethereum.tags=${{ steps.params.outputs.ethereum_tags }}
optimism.tags=${{ steps.params.outputs.optimism_tags }}

184
Cargo.lock generated
View File

@@ -106,9 +106,9 @@ checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923"
[[package]]
name = "alloy-chains"
version = "0.2.28"
version = "0.2.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3842d8c52fcd3378039f4703dba392dca8b546b1c8ed6183048f8dab95b2be78"
checksum = "90f374d3c6d729268bbe2d0e0ff992bb97898b2df756691a62ee1d5f0506bc39"
dependencies = [
"alloy-primitives",
"alloy-rlp",
@@ -249,9 +249,9 @@ dependencies = [
[[package]]
name = "alloy-eip7928"
version = "0.3.0"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6adac476434bf024279164dcdca299309f0c7d1e3557024eb7a83f8d9d01c6b5"
checksum = "d3231de68d5d6e75332b7489cfcc7f4dfabeba94d990a10e4b923af0e6623540"
dependencies = [
"alloy-primitives",
"alloy-rlp",
@@ -495,7 +495,7 @@ dependencies = [
"async-stream",
"async-trait",
"auto_impl",
"dashmap 6.1.0",
"dashmap",
"either",
"futures",
"futures-utils-wasm",
@@ -1774,7 +1774,7 @@ dependencies = [
"bytemuck",
"cfg-if",
"cow-utils",
"dashmap 6.1.0",
"dashmap",
"dynify",
"fast-float2",
"float16",
@@ -1968,12 +1968,6 @@ version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7575182f7272186991736b70173b0ea045398f984bf5ebbb3804736ce1330c9d"
[[package]]
name = "bytecount"
version = "0.6.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "175812e0be2bccb6abe50bb8d566126198344f707e304f45c648fd8f2cc0365e"
[[package]]
name = "bytemuck"
version = "1.24.0"
@@ -2063,19 +2057,6 @@ dependencies = [
"serde_core",
]
[[package]]
name = "cargo_metadata"
version = "0.14.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa"
dependencies = [
"camino",
"cargo-platform 0.1.9",
"semver 1.0.27",
"serde",
"serde_json",
]
[[package]]
name = "cargo_metadata"
version = "0.19.2"
@@ -2127,9 +2108,9 @@ dependencies = [
[[package]]
name = "cc"
version = "1.2.53"
version = "1.2.54"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "755d2fce177175ffca841e9a06afdb2c4ab0f593d53b4dee48147dfaade85932"
checksum = "6354c81bbfd62d9cfa9cb3c773c2b7b2a3a482d569de977fd0e961f6e7c00583"
dependencies = [
"find-msvc-tools",
"jobserver",
@@ -2930,19 +2911,6 @@ dependencies = [
"syn 2.0.114",
]
[[package]]
name = "dashmap"
version = "5.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
dependencies = [
"cfg-if",
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]]
name = "dashmap"
version = "6.1.0"
@@ -3495,15 +3463,6 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "error-chain"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc"
dependencies = [
"version_check",
]
[[package]]
name = "ethereum_hashing"
version = "0.7.0"
@@ -4082,11 +4041,12 @@ checksum = "8591b0bcc8a98a64310a2fae1bb3e9b8564dd10e381e6e28010fde8e8e8568db"
[[package]]
name = "fixed-cache"
version = "0.1.5"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25d3af83468398d500e9bc19e001812dcb1a11e4d3d6a5956c789aa3c11a8cb5"
checksum = "0aaafa7294e9617eb29e5c684a3af33324ef512a1bf596af2d1938a03798da29"
dependencies = [
"equivalent",
"typeid",
]
[[package]]
@@ -4851,7 +4811,7 @@ dependencies = [
"libc",
"percent-encoding",
"pin-project-lite",
"socket2 0.6.1",
"socket2 0.6.2",
"tokio",
"tower-service",
"tracing",
@@ -5593,9 +5553,9 @@ dependencies = [
[[package]]
name = "libm"
version = "0.2.15"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de"
checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981"
[[package]]
name = "libp2p-identity"
@@ -5967,21 +5927,6 @@ dependencies = [
"unicase",
]
[[package]]
name = "mini-moka"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c325dfab65f261f386debee8b0969da215b3fa0037e74c8a1234db7ba986d803"
dependencies = [
"crossbeam-channel",
"crossbeam-utils",
"dashmap 5.5.3",
"skeptic",
"smallvec",
"tagptr",
"triomphe",
]
[[package]]
name = "minimal-lexical"
version = "0.2.1"
@@ -6208,9 +6153,9 @@ dependencies = [
[[package]]
name = "num-conv"
version = "0.1.0"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050"
[[package]]
name = "num-integer"
@@ -6515,9 +6460,9 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381"
[[package]]
name = "openssl-probe"
version = "0.2.0"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f50d9b3dabb09ecd771ad0aa242ca6894994c130308ca3d7684634df8037391"
checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe"
[[package]]
name = "opentelemetry"
@@ -7022,9 +6967,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.105"
version = "1.0.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "535d180e0ecab6268a3e718bb9fd44db66bbbc256257165fc699dadf70d16fe7"
checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934"
dependencies = [
"unicode-ident",
]
@@ -7172,17 +7117,6 @@ dependencies = [
"syn 2.0.114",
]
[[package]]
name = "pulldown-cmark"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57206b407293d2bcd3af849ce869d52068623f19e1b5ff8e8778e3309439682b"
dependencies = [
"bitflags 2.10.0",
"memchr",
"unicase",
]
[[package]]
name = "quanta"
version = "0.12.6"
@@ -7226,7 +7160,7 @@ dependencies = [
"quinn-udp",
"rustc-hash",
"rustls",
"socket2 0.6.1",
"socket2 0.6.2",
"thiserror 2.0.18",
"tokio",
"tracing",
@@ -7263,16 +7197,16 @@ dependencies = [
"cfg_aliases",
"libc",
"once_cell",
"socket2 0.6.1",
"socket2 0.6.2",
"tracing",
"windows-sys 0.60.2",
]
[[package]]
name = "quote"
version = "1.0.43"
version = "1.0.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc74d9a594b72ae6656596548f56f667211f8a97b3d4c3d467150794690dc40a"
checksum = "21b2ebcf727b7760c461f091f9f0f539b77b8e87f2fd88131e7f1b433b3cece4"
dependencies = [
"proc-macro2",
]
@@ -8465,13 +8399,13 @@ dependencies = [
"assert_matches",
"codspeed-criterion-compat",
"crossbeam-channel",
"dashmap 6.1.0",
"dashmap",
"derive_more",
"eyre",
"fixed-cache",
"futures",
"metrics",
"metrics-util",
"mini-moka",
"moka",
"parking_lot",
"proptest",
@@ -9111,7 +9045,7 @@ dependencies = [
"bitflags 2.10.0",
"byteorder",
"codspeed-criterion-compat",
"dashmap 6.1.0",
"dashmap",
"derive_more",
"parking_lot",
"rand 0.9.2",
@@ -9196,6 +9130,7 @@ dependencies = [
"reth-eth-wire-types",
"reth-ethereum-forks",
"reth-ethereum-primitives",
"reth-evm-ethereum",
"reth-fs-util",
"reth-metrics",
"reth-net-banlist",
@@ -10063,6 +9998,7 @@ dependencies = [
"parking_lot",
"reth-chain-state",
"reth-chainspec",
"reth-evm",
"reth-metrics",
"reth-optimism-chainspec",
"reth-optimism-evm",
@@ -10218,7 +10154,7 @@ dependencies = [
"alloy-primitives",
"alloy-rpc-types-engine",
"assert_matches",
"dashmap 6.1.0",
"dashmap",
"eyre",
"itertools 0.14.0",
"metrics",
@@ -10281,6 +10217,7 @@ dependencies = [
"reth-stages",
"reth-stages-types",
"reth-static-file-types",
"reth-storage-api",
"reth-testing-utils",
"reth-tokio-util",
"reth-tracing",
@@ -10628,7 +10565,6 @@ dependencies = [
"jsonrpsee-core",
"jsonrpsee-types",
"metrics",
"parking_lot",
"reth-chainspec",
"reth-engine-primitives",
"reth-ethereum-engine-primitives",
@@ -11118,6 +11054,8 @@ dependencies = [
"reth-chainspec",
"reth-eth-wire-types",
"reth-ethereum-primitives",
"reth-evm",
"reth-evm-ethereum",
"reth-execution-types",
"reth-fs-util",
"reth-metrics",
@@ -11126,6 +11064,7 @@ dependencies = [
"reth-storage-api",
"reth-tasks",
"reth-tracing",
"revm",
"revm-interpreter",
"revm-primitives",
"rustc-hash",
@@ -11246,7 +11185,7 @@ dependencies = [
"alloy-rlp",
"codspeed-criterion-compat",
"crossbeam-channel",
"dashmap 6.1.0",
"dashmap",
"derive_more",
"itertools 0.14.0",
"metrics",
@@ -11469,9 +11408,9 @@ dependencies = [
[[package]]
name = "revm-inspectors"
version = "0.34.0"
version = "0.34.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a1ce3f52a052d78cc251714d57bf05dc8bc75e269677de11805d3153300a2cd"
checksum = "a24ca988ae1f7a0bb5688630579c00e867cd9f1df0a2f040623887f63d3b414c"
dependencies = [
"alloy-primitives",
"alloy-rpc-types-eth",
@@ -12382,21 +12321,6 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d"
[[package]]
name = "skeptic"
version = "0.13.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8"
dependencies = [
"bytecount",
"cargo_metadata 0.14.2",
"error-chain",
"glob",
"pulldown-cmark",
"tempfile",
"walkdir",
]
[[package]]
name = "sketches-ddsketch"
version = "0.3.0"
@@ -12465,9 +12389,9 @@ dependencies = [
[[package]]
name = "socket2"
version = "0.6.1"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17129e116933cf371d018bb80ae557e889637989d8638274fb25622827b03881"
checksum = "86f4aa3ad99f2088c990dfa82d367e19cb29268ed67c574d10d0a4bfe71f07e0"
dependencies = [
"libc",
"windows-sys 0.60.2",
@@ -12868,9 +12792,9 @@ dependencies = [
[[package]]
name = "time"
version = "0.3.45"
version = "0.3.46"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9e442fc33d7fdb45aa9bfeb312c095964abdf596f7567261062b2a7107aaabd"
checksum = "9da98b7d9b7dad93488a84b8248efc35352b0b2657397d4167e7ad67e5d535e5"
dependencies = [
"deranged",
"itoa",
@@ -12886,15 +12810,15 @@ dependencies = [
[[package]]
name = "time-core"
version = "0.1.7"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b36ee98fd31ec7426d599183e8fe26932a8dc1fb76ddb6214d05493377d34ca"
checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca"
[[package]]
name = "time-macros"
version = "0.2.25"
version = "0.2.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71e552d1249bf61ac2a52db88179fd0673def1e1ad8243a00d9ec9ed71fee3dd"
checksum = "78cc610bac2dcee56805c99642447d4c5dbde4d01f752ffea0199aee1f601dc4"
dependencies = [
"num-conv",
"time-core",
@@ -12957,7 +12881,7 @@ dependencies = [
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2 0.6.1",
"socket2 0.6.2",
"tokio-macros",
"windows-sys 0.61.2",
]
@@ -13437,12 +13361,6 @@ dependencies = [
"rlp",
]
[[package]]
name = "triomphe"
version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd69c5aa8f924c7519d6372789a74eac5b94fb0f8fcf0d4a97eb0bfc3e785f39"
[[package]]
name = "try-lock"
version = "0.2.5"
@@ -13468,6 +13386,12 @@ dependencies = [
"utf-8",
]
[[package]]
name = "typeid"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c"
[[package]]
name = "typenum"
version = "1.19.0"
@@ -13624,9 +13548,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "uuid"
version = "1.19.0"
version = "1.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a"
checksum = "ee48d38b119b0cd71fe4141b30f5ba9c7c5d9f4e7a3a8b4a674e4b6ef789976f"
dependencies = [
"getrandom 0.3.4",
"js-sys",

View File

@@ -481,7 +481,7 @@ revm-primitives = { version = "22.0.0", default-features = false }
revm-interpreter = { version = "32.0.0", default-features = false }
revm-database-interface = { version = "9.0.0", default-features = false }
op-revm = { version = "15.0.0", default-features = false }
revm-inspectors = "0.34.0"
revm-inspectors = "0.34.1"
# eth
alloy-chains = { version = "0.2.5", default-features = false }
@@ -588,7 +588,7 @@ tracing-appender = "0.2"
url = { version = "2.3", default-features = false }
zstd = "0.13"
byteorder = "1"
mini-moka = "0.10"
fixed-cache = { version = "0.1.7", features = ["stats"] }
moka = "0.12"
tar-no-std = { version = "0.3.2", default-features = false }
miniz_oxide = { version = "0.8.4", default-features = false }

View File

@@ -1,15 +0,0 @@
# This image is meant to enable cross-architecture builds.
# It assumes the reth binary has already been compiled for `$TARGETPLATFORM` and is
# locatable in `./dist/bin/$TARGETARCH`
FROM --platform=$TARGETPLATFORM ubuntu:22.04
LABEL org.opencontainers.image.source=https://github.com/paradigmxyz/reth
LABEL org.opencontainers.image.licenses="MIT OR Apache-2.0"
# Filled by docker buildx
ARG TARGETARCH
COPY ./dist/bin/$TARGETARCH/reth /usr/local/bin/reth
EXPOSE 30303 30303/udp 9001 8545 8546
ENTRYPOINT ["/usr/local/bin/reth"]

99
Dockerfile.depot Normal file
View File

@@ -0,0 +1,99 @@
# syntax=docker/dockerfile:1
# Unified Dockerfile for reth and op-reth, optimized for Depot builds
# Usage:
# reth: --build-arg BINARY=reth
# op-reth: --build-arg BINARY=op-reth --build-arg MANIFEST_PATH=crates/optimism/bin
FROM lukemathwalker/cargo-chef:latest-rust-1 AS chef
WORKDIR /app
LABEL org.opencontainers.image.source=https://github.com/paradigmxyz/reth
LABEL org.opencontainers.image.licenses="MIT OR Apache-2.0"
RUN apt-get update && apt-get install -y libclang-dev pkg-config
# Install sccache for compilation caching
RUN cargo install sccache --locked
ENV RUSTC_WRAPPER=sccache
ENV SCCACHE_DIR=/sccache
ENV SCCACHE_WEBDAV_ENDPOINT=https://cache.depot.dev
# Builds a cargo-chef plan
FROM chef AS planner
COPY --exclude=.git . .
RUN cargo chef prepare --recipe-path recipe.json
FROM chef AS builder
COPY --from=planner /app/recipe.json recipe.json
# Binary to build (reth or op-reth)
ARG BINARY=reth
# Manifest path for the binary
ARG MANIFEST_PATH=bin/reth
# Build profile, release by default
ARG BUILD_PROFILE=release
ENV BUILD_PROFILE=$BUILD_PROFILE
# Extra Cargo flags
ARG RUSTFLAGS=""
ENV RUSTFLAGS="$RUSTFLAGS"
# Extra Cargo features
ARG FEATURES=""
ENV FEATURES=$FEATURES
# Git info for vergen (since .git is excluded from Docker context)
ARG VERGEN_GIT_SHA=""
ARG VERGEN_GIT_DESCRIBE=""
ARG VERGEN_GIT_DIRTY="false"
ENV VERGEN_GIT_SHA=$VERGEN_GIT_SHA
ENV VERGEN_GIT_DESCRIBE=$VERGEN_GIT_DESCRIBE
ENV VERGEN_GIT_DIRTY=$VERGEN_GIT_DIRTY
# Build dependencies
RUN --mount=type=secret,id=DEPOT_TOKEN,env=SCCACHE_WEBDAV_TOKEN \
--mount=type=cache,target=/usr/local/cargo/registry,sharing=shared \
--mount=type=cache,target=/usr/local/cargo/git,sharing=shared \
--mount=type=cache,target=$SCCACHE_DIR,sharing=shared \
cargo chef cook --profile $BUILD_PROFILE --features "$FEATURES" --locked --recipe-path recipe.json --manifest-path $MANIFEST_PATH/Cargo.toml
# Build application
COPY --exclude=.git . .
RUN --mount=type=secret,id=DEPOT_TOKEN,env=SCCACHE_WEBDAV_TOKEN \
--mount=type=cache,target=/usr/local/cargo/registry,sharing=shared \
--mount=type=cache,target=/usr/local/cargo/git,sharing=shared \
--mount=type=cache,target=$SCCACHE_DIR,sharing=shared \
cargo build --profile $BUILD_PROFILE --features "$FEATURES" --locked --bin $BINARY --manifest-path $MANIFEST_PATH/Cargo.toml
RUN sccache --show-stats || true
# Copy binary to a known location (ARG not resolved in COPY)
# Note: Custom profiles like maxperf/profiling output to target/<profile>/, not target/release/
RUN cp /app/target/$BUILD_PROFILE/$BINARY /app/binary || \
cp /app/target/release/$BINARY /app/binary
FROM ubuntu:24.04 AS runtime
WORKDIR /app
# Binary name for entrypoint
ARG BINARY=reth
# Install runtime dependencies
RUN apt-get update && \
apt-get install -y --no-install-recommends ca-certificates && \
rm -rf /var/lib/apt/lists/*
# Copy binary from build stage and create canonical symlink for entrypoint
COPY --from=builder /app/binary /usr/local/bin/
RUN mv /usr/local/bin/binary /usr/local/bin/$BINARY && \
ln -s /usr/local/bin/$BINARY /usr/local/bin/reth-binary && \
chmod +x /usr/local/bin/$BINARY
# Copy licenses
COPY LICENSE-* ./
EXPOSE 30303 30303/udp 9001 8545 8546
ENTRYPOINT ["/usr/local/bin/reth-binary"]

View File

@@ -1,15 +0,0 @@
# This image is meant to enable cross-architecture builds.
# It assumes the reth binary has already been compiled for `$TARGETPLATFORM` and is
# locatable in `./dist/bin/$TARGETARCH`
FROM --platform=$TARGETPLATFORM ubuntu:22.04
LABEL org.opencontainers.image.source=https://github.com/paradigmxyz/reth
LABEL org.opencontainers.image.licenses="MIT OR Apache-2.0"
# Filled by docker buildx
ARG TARGETARCH
COPY ./dist/bin/$TARGETARCH/op-reth /usr/local/bin/op-reth
EXPOSE 30303 30303/udp 9001 8545 8546
ENTRYPOINT ["/usr/local/bin/op-reth"]

134
Makefile
View File

@@ -35,9 +35,6 @@ EEST_TESTS_TAG := v4.5.0
EEST_TESTS_URL := https://github.com/ethereum/execution-spec-tests/releases/download/$(EEST_TESTS_TAG)/fixtures_stable.tar.gz
EEST_TESTS_DIR := ./testing/ef-tests/execution-spec-tests
# The docker image name
DOCKER_IMAGE_NAME ?= ghcr.io/paradigmxyz/reth
##@ Help
.PHONY: help
@@ -242,137 +239,6 @@ install-reth-bench: ## Build and install the reth binary under `$(CARGO_HOME)/bi
--features "$(FEATURES)" \
--profile "$(PROFILE)"
##@ Docker
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --driver docker-container --name cross-builder`
.PHONY: docker-build-push
docker-build-push: ## Build and push a cross-arch Docker image tagged with the latest git tag.
$(call docker_build_push,$(GIT_TAG),$(GIT_TAG))
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --driver docker-container --name cross-builder`
.PHONY: docker-build-push-git-sha
docker-build-push-git-sha: ## Build and push a cross-arch Docker image tagged with the latest git sha.
$(call docker_build_push,$(GIT_SHA),$(GIT_SHA))
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --driver docker-container --name cross-builder`
.PHONY: docker-build-push-latest
docker-build-push-latest: ## Build and push a cross-arch Docker image tagged with the latest git tag and `latest`.
$(call docker_build_push,$(GIT_TAG),latest)
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --name cross-builder`
.PHONY: docker-build-push-nightly
docker-build-push-nightly: ## Build and push cross-arch Docker image tagged with the latest git tag with a `-nightly` suffix, and `latest-nightly`.
$(call docker_build_push,nightly,nightly)
.PHONY: docker-build-push-nightly-edge-profiling
docker-build-push-nightly-edge-profiling: FEATURES := $(FEATURES) edge
docker-build-push-nightly-edge-profiling: ## Build and push cross-arch Docker image with edge features tagged with `nightly-edge-profiling`.
$(call docker_build_push,nightly-edge-profiling,nightly-edge-profiling)
# Create a cross-arch Docker image with the given tags and push it
define docker_build_push
$(MAKE) FEATURES="$(FEATURES)" build-x86_64-unknown-linux-gnu
mkdir -p $(BIN_DIR)/amd64
cp $(CARGO_TARGET_DIR)/x86_64-unknown-linux-gnu/$(PROFILE)/reth $(BIN_DIR)/amd64/reth
$(MAKE) FEATURES="$(FEATURES)" build-aarch64-unknown-linux-gnu
mkdir -p $(BIN_DIR)/arm64
cp $(CARGO_TARGET_DIR)/aarch64-unknown-linux-gnu/$(PROFILE)/reth $(BIN_DIR)/arm64/reth
docker buildx build --file ./Dockerfile.cross . \
--platform linux/amd64,linux/arm64 \
--tag $(DOCKER_IMAGE_NAME):$(1) \
--tag $(DOCKER_IMAGE_NAME):$(2) \
--provenance=false \
--push
endef
##@ Optimism docker
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --driver docker-container --name cross-builder`
.PHONY: op-docker-build-push
op-docker-build-push: ## Build and push a cross-arch Docker image tagged with the latest git tag.
$(call op_docker_build_push,$(GIT_TAG),$(GIT_TAG))
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --driver docker-container --name cross-builder`
.PHONY: op-docker-build-push-git-sha
op-docker-build-push-git-sha: ## Build and push a cross-arch Docker image tagged with the latest git sha.
$(call op_docker_build_push,$(GIT_SHA),$(GIT_SHA))
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --driver docker-container --name cross-builder`
.PHONY: op-docker-build-push-latest
op-docker-build-push-latest: ## Build and push a cross-arch Docker image tagged with the latest git tag and `latest`.
$(call op_docker_build_push,$(GIT_TAG),latest)
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --name cross-builder`
.PHONY: op-docker-build-push-nightly
op-docker-build-push-nightly: ## Build and push cross-arch Docker image tagged with the latest git tag with a `-nightly` suffix, and `latest-nightly`.
$(call op_docker_build_push,nightly,nightly)
.PHONY: op-docker-build-push-nightly-edge-profiling
op-docker-build-push-nightly-edge-profiling: FEATURES := $(FEATURES) edge
op-docker-build-push-nightly-edge-profiling: ## Build and push cross-arch Docker image with edge features tagged with `nightly-edge-profiling`.
$(call op_docker_build_push,nightly-edge-profiling,nightly-edge-profiling)
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --name cross-builder`
.PHONY: docker-build-push-nightly-profiling
docker-build-push-nightly-profiling: ## Build and push cross-arch Docker image with profiling profile tagged with nightly-profiling.
$(call docker_build_push,nightly-profiling,nightly-profiling)
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`
# `docker buildx create --use --name cross-builder`
.PHONY: op-docker-build-push-nightly-profiling
op-docker-build-push-nightly-profiling: ## Build and push cross-arch Docker image tagged with the latest git tag with a `-nightly` suffix, and `latest-nightly`.
$(call op_docker_build_push,nightly-profiling,nightly-profiling)
# Create a cross-arch Docker image with the given tags and push it
define op_docker_build_push
$(MAKE) FEATURES="$(FEATURES)" op-build-x86_64-unknown-linux-gnu
mkdir -p $(BIN_DIR)/amd64
cp $(CARGO_TARGET_DIR)/x86_64-unknown-linux-gnu/$(PROFILE)/op-reth $(BIN_DIR)/amd64/op-reth
$(MAKE) FEATURES="$(FEATURES)" op-build-aarch64-unknown-linux-gnu
mkdir -p $(BIN_DIR)/arm64
cp $(CARGO_TARGET_DIR)/aarch64-unknown-linux-gnu/$(PROFILE)/op-reth $(BIN_DIR)/arm64/op-reth
docker buildx build --file ./DockerfileOp.cross . \
--platform linux/amd64,linux/arm64 \
--tag $(DOCKER_IMAGE_NAME):$(1) \
--tag $(DOCKER_IMAGE_NAME):$(2) \
--provenance=false \
--push
endef
##@ Other
.PHONY: clean

View File

@@ -209,10 +209,21 @@ pub struct Command {
#[arg(long, value_name = "TARGET_GAS", default_value = "30000000", value_parser = parse_gas_limit)]
target_gas: u64,
/// Starting block number to fetch transactions from.
/// If not specified, starts from the engine's latest block.
/// Block number to start fetching transactions from (required).
///
/// This must be the last canonical block BEFORE any gas limit ramping was performed.
/// The command collects transactions from historical blocks starting at this number
/// to pack into large blocks.
///
/// How to determine this value:
/// - If starting from a fresh node (no gas limit ramp yet): use the current chain tip
/// - If gas limit ramping has already been performed: use the block number that was the chain
/// tip BEFORE ramping began (you must track this yourself)
///
/// Using a block after ramping started will cause transaction collection to fail
/// because those blocks contain synthetic transactions that cannot be replayed.
#[arg(long, value_name = "FROM_BLOCK")]
from_block: Option<u64>,
from_block: u64,
/// Execute the payload (call newPayload + forkchoiceUpdated).
/// If false, only builds the payload and prints it.
@@ -288,7 +299,7 @@ impl Command {
format!("Failed to create output directory: {:?}", self.output_dir)
})?;
let start_block = self.from_block.unwrap_or(parent_number);
let start_block = self.from_block;
// Use pipelined execution when generating multiple payloads
if self.count > 1 {

View File

@@ -206,11 +206,33 @@ impl DeferredTrieData {
Default::default(), // prefix_sets are per-block, not cumulative
);
// Only trigger COW clone if there's actually data to add.
if !sorted_hashed_state.is_empty() {
Arc::make_mut(&mut overlay.state).extend_ref_and_sort(&sorted_hashed_state);
#[cfg(feature = "rayon")]
{
rayon::join(
|| {
if !sorted_hashed_state.is_empty() {
Arc::make_mut(&mut overlay.state)
.extend_ref_and_sort(&sorted_hashed_state);
}
},
|| {
if !sorted_trie_updates.is_empty() {
Arc::make_mut(&mut overlay.nodes)
.extend_ref_and_sort(&sorted_trie_updates);
}
},
);
}
if !sorted_trie_updates.is_empty() {
Arc::make_mut(&mut overlay.nodes).extend_ref_and_sort(&sorted_trie_updates);
#[cfg(not(feature = "rayon"))]
{
if !sorted_hashed_state.is_empty() {
Arc::make_mut(&mut overlay.state)
.extend_ref_and_sort(&sorted_hashed_state);
}
if !sorted_trie_updates.is_empty() {
Arc::make_mut(&mut overlay.nodes)
.extend_ref_and_sort(&sorted_trie_updates);
}
}
overlay
}
@@ -243,53 +265,8 @@ impl DeferredTrieData {
/// In normal operation, the parent always has a cached overlay and this
/// function is never called.
///
/// When the `rayon` feature is enabled, uses parallel collection and merge:
/// 1. Collects ancestor data in parallel (each `wait_cloned()` may compute)
/// 2. Merges hashed state and trie updates in parallel with each other
/// 3. Uses tree reduction within each merge for O(log n) depth
#[cfg(feature = "rayon")]
fn merge_ancestors_into_overlay(
ancestors: &[Self],
sorted_hashed_state: &HashedPostStateSorted,
sorted_trie_updates: &TrieUpdatesSorted,
) -> TrieInputSorted {
// Early exit: no ancestors means just wrap current block's data
if ancestors.is_empty() {
return TrieInputSorted::new(
Arc::new(sorted_trie_updates.clone()),
Arc::new(sorted_hashed_state.clone()),
Default::default(),
);
}
// Collect ancestor data, unzipping states and updates into Arc slices
let (states, updates): (Vec<_>, Vec<_>) = ancestors
.iter()
.map(|a| {
let data = a.wait_cloned();
(data.hashed_state, data.trie_updates)
})
.unzip();
// Merge state and nodes in parallel with each other using tree reduction
let (state, nodes) = rayon::join(
|| {
let mut merged = HashedPostStateSorted::merge_parallel(&states);
merged.extend_ref_and_sort(sorted_hashed_state);
merged
},
|| {
let mut merged = TrieUpdatesSorted::merge_parallel(&updates);
merged.extend_ref_and_sort(sorted_trie_updates);
merged
},
);
TrieInputSorted::new(Arc::new(nodes), Arc::new(state), Default::default())
}
/// Merge all ancestors and current block's data into a single overlay (sequential fallback).
#[cfg(not(feature = "rayon"))]
/// Iterates ancestors oldest -> newest, then extends with current block's data,
/// so later state takes precedence.
fn merge_ancestors_into_overlay(
ancestors: &[Self],
sorted_hashed_state: &HashedPostStateSorted,
@@ -307,8 +284,17 @@ impl DeferredTrieData {
}
// Extend with current block's sorted data last (takes precedence)
state_mut.extend_ref_and_sort(sorted_hashed_state);
nodes_mut.extend_ref_and_sort(sorted_trie_updates);
#[cfg(feature = "rayon")]
rayon::join(
|| state_mut.extend_ref_and_sort(sorted_hashed_state),
|| nodes_mut.extend_ref_and_sort(sorted_trie_updates),
);
#[cfg(not(feature = "rayon"))]
{
state_mut.extend_ref_and_sort(sorted_hashed_state);
nodes_mut.extend_ref_and_sort(sorted_trie_updates);
}
overlay
}

View File

@@ -17,7 +17,10 @@ use reth_primitives_traits::{
SignedTransaction,
};
use reth_storage_api::StateProviderBox;
use reth_trie::{updates::TrieUpdatesSorted, HashedPostStateSorted, LazyTrieData, TrieInputSorted};
use reth_trie::{
updates::TrieUpdatesSorted, HashedPostStateSorted, LazyTrieData, SortedTrieData,
TrieInputSorted,
};
use std::{collections::BTreeMap, sync::Arc, time::Instant};
use tokio::sync::{broadcast, watch};
@@ -948,22 +951,36 @@ impl<N: NodePrimitives<SignedTx: SignedTransaction>> NewCanonicalChain<N> {
match blocks {
[] => Chain::default(),
[first, rest @ ..] => {
let trie_data_handle = first.trie_data_handle();
let mut chain = Chain::from_block(
first.recovered_block().clone(),
ExecutionOutcome::from((
first.execution_outcome().clone(),
first.block_number(),
)),
LazyTrieData::ready(first.hashed_state(), first.trie_updates()),
LazyTrieData::deferred(move || {
let trie_data = trie_data_handle.wait_cloned();
SortedTrieData {
hashed_state: trie_data.hashed_state,
trie_updates: trie_data.trie_updates,
}
}),
);
for exec in rest {
let trie_data_handle = exec.trie_data_handle();
chain.append_block(
exec.recovered_block().clone(),
ExecutionOutcome::from((
exec.execution_outcome().clone(),
exec.block_number(),
)),
LazyTrieData::ready(exec.hashed_state(), exec.trie_updates()),
LazyTrieData::deferred(move || {
let trie_data = trie_data_handle.wait_cloned();
SortedTrieData {
hashed_state: trie_data.hashed_state,
trie_updates: trie_data.trie_updates,
}
}),
);
}
chain

View File

@@ -25,6 +25,7 @@ pub use alloy_chains::{Chain, ChainKind, NamedChain};
/// Re-export for convenience
pub use reth_ethereum_forks::*;
pub use alloy_evm::EvmLimitParams;
pub use api::EthChainSpec;
pub use info::ChainInfo;
#[cfg(any(test, feature = "test-utils"))]

View File

@@ -205,6 +205,16 @@ impl Command {
.add_cell(Cell::new(human_bytes(total_size as f64)))
.add_cell(Cell::new(human_bytes(total_pending as f64)));
table.add_row(row);
let wal_size = tool.provider_factory.rocksdb_provider().wal_size_bytes();
let mut row = Row::new();
row.add_cell(Cell::new("WAL"))
.add_cell(Cell::new(""))
.add_cell(Cell::new(""))
.add_cell(Cell::new(""))
.add_cell(Cell::new(human_bytes(wal_size as f64)))
.add_cell(Cell::new(""));
table.add_row(row);
}
table

View File

@@ -26,6 +26,14 @@ pub struct ImportCommand<C: ChainSpecParser> {
#[arg(long, value_name = "CHUNK_LEN", verbatim_doc_comment)]
chunk_len: Option<u64>,
/// Fail immediately when an invalid block is encountered.
///
/// By default, the import will stop at the last valid block if an invalid block is
/// encountered during execution or validation, leaving the database at the last valid
/// block state. When this flag is set, the import will instead fail with an error.
#[arg(long, verbatim_doc_comment)]
fail_on_invalid_block: bool,
/// The path(s) to block file(s) for import.
///
/// The online stages (headers and bodies) are replaced by a file import, after which the
@@ -52,7 +60,11 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportComm
info!(target: "reth::cli", "Starting import of {} file(s)", self.paths.len());
let import_config = ImportConfig { no_state: self.no_state, chunk_len: self.chunk_len };
let import_config = ImportConfig {
no_state: self.no_state,
chunk_len: self.chunk_len,
fail_on_invalid_block: self.fail_on_invalid_block,
};
let executor = components.evm_config().clone();
let consensus = Arc::new(components.consensus().clone());
@@ -81,7 +93,20 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportComm
total_decoded_blocks += result.total_decoded_blocks;
total_decoded_txns += result.total_decoded_txns;
if !result.is_complete() {
// Check if we stopped due to an invalid block
if result.stopped_on_invalid_block {
info!(target: "reth::cli",
"Stopped at last valid block {} due to invalid block {} in file: {}. Imported {} blocks, {} transactions",
result.last_valid_block.unwrap_or(0),
result.bad_block.unwrap_or(0),
path.display(),
result.total_imported_blocks,
result.total_imported_txns);
// Stop importing further files and exit successfully
break;
}
if !result.is_successful() {
return Err(eyre::eyre!(
"Chain was partially imported from file: {}. Imported {}/{} blocks, {}/{} transactions",
path.display(),
@@ -98,7 +123,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportComm
}
info!(target: "reth::cli",
"All files imported successfully. Total: {}/{} blocks, {}/{} transactions",
"Import complete. Total: {}/{} blocks, {}/{} transactions",
total_imported_blocks, total_decoded_blocks, total_imported_txns, total_decoded_txns);
Ok(())
@@ -139,4 +164,20 @@ mod tests {
assert_eq!(args.paths[1], PathBuf::from("file2.rlp"));
assert_eq!(args.paths[2], PathBuf::from("file3.rlp"));
}
#[test]
fn parse_import_command_with_fail_on_invalid_block() {
let args: ImportCommand<EthereumChainSpecParser> =
ImportCommand::parse_from(["reth", "--fail-on-invalid-block", "chain.rlp"]);
assert!(args.fail_on_invalid_block);
assert_eq!(args.paths.len(), 1);
assert_eq!(args.paths[0], PathBuf::from("chain.rlp"));
}
#[test]
fn parse_import_command_default_stops_on_invalid_block() {
let args: ImportCommand<EthereumChainSpecParser> =
ImportCommand::parse_from(["reth", "chain.rlp"]);
assert!(!args.fail_on_invalid_block);
}
}

View File

@@ -22,11 +22,11 @@ use reth_provider::{
StageCheckpointReader,
};
use reth_prune::PruneModes;
use reth_stages::{prelude::*, Pipeline, StageId, StageSet};
use reth_stages::{prelude::*, ControlFlow, Pipeline, StageId, StageSet};
use reth_static_file::StaticFileProducer;
use std::{path::Path, sync::Arc};
use tokio::sync::watch;
use tracing::{debug, error, info};
use tracing::{debug, error, info, warn};
/// Configuration for importing blocks from RLP files.
#[derive(Debug, Clone, Default)]
@@ -35,6 +35,9 @@ pub struct ImportConfig {
pub no_state: bool,
/// Chunk byte length to read from file.
pub chunk_len: Option<u64>,
/// If true, fail immediately when an invalid block is encountered.
/// By default (false), the import stops at the last valid block and exits successfully.
pub fail_on_invalid_block: bool,
}
/// Result of an import operation.
@@ -48,6 +51,12 @@ pub struct ImportResult {
pub total_imported_blocks: usize,
/// Total number of transactions imported into the database.
pub total_imported_txns: usize,
/// Whether the import was stopped due to an invalid block.
pub stopped_on_invalid_block: bool,
/// The block number that was invalid, if any.
pub bad_block: Option<u64>,
/// The last valid block number when stopped due to invalid block.
pub last_valid_block: Option<u64>,
}
impl ImportResult {
@@ -56,6 +65,14 @@ impl ImportResult {
self.total_decoded_blocks == self.total_imported_blocks &&
self.total_decoded_txns == self.total_imported_txns
}
/// Returns true if the import was successful, considering stop-on-invalid-block mode.
///
/// In stop-on-invalid-block mode, a partial import is considered successful if we
/// stopped due to an invalid block (leaving the DB at the last valid block).
pub fn is_successful(&self) -> bool {
self.is_complete() || self.stopped_on_invalid_block
}
}
/// Imports blocks from an RLP-encoded file into the database.
@@ -103,6 +120,11 @@ where
let static_file_producer =
StaticFileProducer::new(provider_factory.clone(), PruneModes::default());
// Track if we stopped due to an invalid block
let mut stopped_on_invalid_block = false;
let mut bad_block_number: Option<u64> = None;
let mut last_valid_block_number: Option<u64> = None;
while let Some(file_client) =
reader.next_chunk::<BlockTy<N>>(consensus.clone(), Some(sealed_header)).await?
{
@@ -137,12 +159,51 @@ where
// Run pipeline
info!(target: "reth::import", "Starting sync pipeline");
tokio::select! {
res = pipeline.run() => res?,
_ = tokio::signal::ctrl_c() => {
info!(target: "reth::import", "Import interrupted by user");
break;
},
if import_config.fail_on_invalid_block {
// Original behavior: fail on unwind
tokio::select! {
res = pipeline.run() => res?,
_ = tokio::signal::ctrl_c() => {
info!(target: "reth::import", "Import interrupted by user");
break;
},
}
} else {
// Default behavior: Use run_loop() to handle unwinds gracefully
let result = tokio::select! {
res = pipeline.run_loop() => res,
_ = tokio::signal::ctrl_c() => {
info!(target: "reth::import", "Import interrupted by user");
break;
},
};
match result {
Ok(ControlFlow::Unwind { target, bad_block }) => {
// An invalid block was encountered; stop at last valid block
let bad = bad_block.block.number;
warn!(
target: "reth::import",
bad_block = bad,
last_valid_block = target,
"Invalid block encountered during import; stopping at last valid block"
);
stopped_on_invalid_block = true;
bad_block_number = Some(bad);
last_valid_block_number = Some(target);
break;
}
Ok(ControlFlow::Continue { block_number }) => {
debug!(target: "reth::import", block_number, "Pipeline chunk completed");
}
Ok(ControlFlow::NoProgress { block_number }) => {
debug!(target: "reth::import", ?block_number, "Pipeline made no progress");
}
Err(e) => {
// Propagate other pipeline errors
return Err(e.into());
}
}
}
sealed_header = provider_factory
@@ -160,9 +221,20 @@ where
total_decoded_txns,
total_imported_blocks,
total_imported_txns,
stopped_on_invalid_block,
bad_block: bad_block_number,
last_valid_block: last_valid_block_number,
};
if !result.is_complete() {
if result.stopped_on_invalid_block {
info!(target: "reth::import",
total_imported_blocks,
total_imported_txns,
bad_block = ?result.bad_block,
last_valid_block = ?result.last_valid_block,
"Import stopped at last valid block due to invalid block"
);
} else if !result.is_complete() {
error!(target: "reth::import",
total_decoded_blocks,
total_imported_blocks,

View File

@@ -1,14 +1,11 @@
//! Collection of methods for block validation.
use alloy_consensus::{BlockHeader as _, Transaction, EMPTY_OMMER_ROOT_HASH};
use alloy_consensus::{BlockHeader as _, EMPTY_OMMER_ROOT_HASH};
use alloy_eips::{eip4844::DATA_GAS_PER_BLOB, eip7840::BlobParams};
use reth_chainspec::{EthChainSpec, EthereumHardfork, EthereumHardforks};
use reth_consensus::{ConsensusError, TxGasLimitTooHighErr};
use reth_consensus::ConsensusError;
use reth_primitives_traits::{
constants::{
GAS_LIMIT_BOUND_DIVISOR, MAXIMUM_GAS_LIMIT_BLOCK, MAX_TX_GAS_LIMIT_OSAKA, MINIMUM_GAS_LIMIT,
},
transaction::TxHashRef,
constants::{GAS_LIMIT_BOUND_DIVISOR, MAXIMUM_GAS_LIMIT_BLOCK, MINIMUM_GAS_LIMIT},
Block, BlockBody, BlockHeader, GotExpected, SealedBlock, SealedHeader,
};
@@ -146,7 +143,7 @@ pub fn validate_block_pre_execution<B, ChainSpec>(
) -> Result<(), ConsensusError>
where
B: Block,
ChainSpec: EthereumHardforks,
ChainSpec: EthChainSpec + EthereumHardforks,
{
post_merge_hardfork_fields(block, chain_spec)?;
@@ -154,19 +151,6 @@ where
if let Err(error) = block.ensure_transaction_root_valid() {
return Err(ConsensusError::BodyTransactionRootDiff(error.into()))
}
// EIP-7825 validation
if chain_spec.is_osaka_active_at_timestamp(block.timestamp()) {
for tx in block.body().transactions() {
if tx.gas_limit() > MAX_TX_GAS_LIMIT_OSAKA {
return Err(TxGasLimitTooHighErr {
tx_hash: *tx.tx_hash(),
gas_limit: tx.gas_limit(),
max_allowed: MAX_TX_GAS_LIMIT_OSAKA,
}
.into());
}
}
}
Ok(())
}

View File

@@ -72,3 +72,11 @@ derive_more.workspace = true
[[test]]
name = "e2e_testsuite"
path = "tests/e2e-testsuite/main.rs"
[[test]]
name = "rocksdb"
path = "tests/rocksdb/main.rs"
required-features = ["edge"]
[features]
edge = ["reth-node-core/edge", "reth-provider/rocksdb", "reth-cli-commands/edge"]

View File

@@ -103,7 +103,10 @@ where
N: NodeBuilderHelper,
{
E2ETestSetupBuilder::new(num_nodes, chain_spec, attributes_generator)
.with_tree_config_modifier(move |_| tree_config.clone())
.with_tree_config_modifier(move |base| {
// Apply caller's tree_config but preserve the small cache size from base
tree_config.clone().with_cross_block_cache_size(base.cross_block_cache_size())
})
.with_node_config_modifier(move |config| config.set_dev(is_dev))
.with_connect_nodes(connect_nodes)
.build()

View File

@@ -112,11 +112,13 @@ where
..NetworkArgs::default()
};
// Apply tree config modifier if present
// Apply tree config modifier if present, with test-appropriate defaults
let base_tree_config =
reth_node_api::TreeConfig::default().with_cross_block_cache_size(1024 * 1024);
let tree_config = if let Some(modifier) = self.tree_config_modifier {
modifier(reth_node_api::TreeConfig::default())
modifier(base_tree_config)
} else {
reth_node_api::TreeConfig::default()
base_tree_config
};
let mut nodes = (0..self.num_nodes)

View File

@@ -38,6 +38,18 @@ impl TransactionTestContext {
signed.encoded_2718().into()
}
/// Creates a transfer with a specific nonce and signs it, returning bytes.
/// Uses high `max_fee_per_gas` (1000 gwei) to ensure tx acceptance regardless of basefee.
pub async fn transfer_tx_bytes_with_nonce(
chain_id: u64,
wallet: PrivateKeySigner,
nonce: u64,
) -> Bytes {
let tx = tx(chain_id, 21000, None, None, nonce, Some(1000e9 as u128));
let signed = Self::sign_tx(wallet, tx).await;
signed.encoded_2718().into()
}
/// Creates a deployment transaction and signs it, returning an envelope.
pub async fn deploy_tx(
chain_id: u64,

View File

@@ -0,0 +1,470 @@
//! E2E tests for `RocksDB` provider functionality.
#![cfg(all(feature = "edge", unix))]
use alloy_consensus::BlockHeader;
use alloy_primitives::B256;
use alloy_rpc_types_eth::{Transaction, TransactionReceipt};
use eyre::Result;
use jsonrpsee::core::client::ClientT;
use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
use reth_db::tables;
use reth_e2e_test_utils::{transaction::TransactionTestContext, wallet, E2ETestSetupBuilder};
use reth_node_builder::NodeConfig;
use reth_node_core::args::RocksDbArgs;
use reth_node_ethereum::EthereumNode;
use reth_payload_builder::EthPayloadBuilderAttributes;
use reth_provider::RocksDBProviderFactory;
use std::{sync::Arc, time::Duration};
const ROCKSDB_POLL_TIMEOUT: Duration = Duration::from_secs(60);
const ROCKSDB_POLL_INTERVAL: Duration = Duration::from_millis(50);
/// Polls RPC until the given `tx_hash` is visible as pending (not yet mined).
/// Prevents race conditions where `advance_block` is called before txs are in the pool.
/// Returns the pending transaction.
async fn wait_for_pending_tx<C: ClientT>(client: &C, tx_hash: B256) -> Transaction {
let start = std::time::Instant::now();
loop {
let tx: Option<Transaction> = client
.request("eth_getTransactionByHash", [tx_hash])
.await
.expect("RPC request failed");
if let Some(tx) = tx {
assert!(
tx.block_number.is_none(),
"Expected pending tx but tx_hash={tx_hash:?} is already mined in block {:?}",
tx.block_number
);
return tx;
}
assert!(
start.elapsed() < ROCKSDB_POLL_TIMEOUT,
"Timed out after {:?} waiting for tx_hash={tx_hash:?} to appear in pending pool",
start.elapsed()
);
tokio::time::sleep(ROCKSDB_POLL_INTERVAL).await;
}
}
/// Polls `RocksDB` until the given `tx_hash` appears in `TransactionHashNumbers`.
/// Returns the `tx_number` on success, or panics on timeout.
async fn poll_tx_in_rocksdb<P: RocksDBProviderFactory>(provider: &P, tx_hash: B256) -> u64 {
let start = std::time::Instant::now();
let mut interval = ROCKSDB_POLL_INTERVAL;
loop {
// Re-acquire handle each iteration to avoid stale snapshot reads
let rocksdb = provider.rocksdb_provider();
let tx_number: Option<u64> =
rocksdb.get::<tables::TransactionHashNumbers>(tx_hash).expect("RocksDB get failed");
if let Some(n) = tx_number {
return n;
}
assert!(
start.elapsed() < ROCKSDB_POLL_TIMEOUT,
"Timed out after {:?} waiting for tx_hash={tx_hash:?} in RocksDB",
start.elapsed()
);
tokio::time::sleep(interval).await;
// Simple backoff: 50ms -> 100ms -> 200ms (capped)
interval = std::cmp::min(interval * 2, Duration::from_millis(200));
}
}
/// Returns the test chain spec for `RocksDB` tests.
fn test_chain_spec() -> Arc<ChainSpec> {
Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(
serde_json::from_str(include_str!("../../src/testsuite/assets/genesis.json"))
.expect("failed to parse genesis.json"),
)
.cancun_activated()
.build(),
)
}
/// Returns test payload attributes for the given timestamp.
fn test_attributes_generator(timestamp: u64) -> EthPayloadBuilderAttributes {
let attributes = alloy_rpc_types_engine::PayloadAttributes {
timestamp,
prev_randao: B256::ZERO,
suggested_fee_recipient: alloy_primitives::Address::ZERO,
withdrawals: Some(vec![]),
parent_beacon_block_root: Some(B256::ZERO),
};
EthPayloadBuilderAttributes::new(B256::ZERO, attributes)
}
/// Enables `RocksDB` for `TransactionHashNumbers` table.
///
/// Note: Static file changesets are disabled because `persistence_threshold(0)` causes
/// a race where the static file writer expects sequential block numbers but receives
/// them out of order, resulting in `UnexpectedStaticFileBlockNumber` errors.
fn with_rocksdb_enabled<C>(mut config: NodeConfig<C>) -> NodeConfig<C> {
config.rocksdb = RocksDbArgs { tx_hash: true, ..Default::default() };
config.static_files.storage_changesets = false;
config.static_files.account_changesets = false;
config
}
/// Smoke test: node boots with `RocksDB` routing enabled.
#[tokio::test]
async fn test_rocksdb_node_startup() -> Result<()> {
reth_tracing::init_test_tracing();
let chain_spec = test_chain_spec();
let (nodes, _tasks, _wallet) =
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, test_attributes_generator)
.with_node_config_modifier(with_rocksdb_enabled)
.build()
.await?;
assert_eq!(nodes.len(), 1);
// Verify RocksDB provider is functional (can query without error)
let rocksdb = nodes[0].inner.provider.rocksdb_provider();
let missing_hash = B256::from([0xab; 32]);
let result: Option<u64> = rocksdb.get::<tables::TransactionHashNumbers>(missing_hash)?;
assert!(result.is_none(), "Missing hash should return None");
let genesis_hash = nodes[0].block_hash(0);
assert_ne!(genesis_hash, B256::ZERO);
Ok(())
}
/// Block mining works with `RocksDB` storage.
#[tokio::test]
async fn test_rocksdb_block_mining() -> Result<()> {
reth_tracing::init_test_tracing();
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _wallet) =
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, test_attributes_generator)
.with_node_config_modifier(with_rocksdb_enabled)
.build()
.await?;
assert_eq!(nodes.len(), 1);
let genesis_hash = nodes[0].block_hash(0);
assert_ne!(genesis_hash, B256::ZERO);
// Mine 3 blocks with transactions
let wallets = wallet::Wallet::new(1).with_chain_id(chain_id).wallet_gen();
let signer = wallets[0].clone();
let client = nodes[0].rpc_client().expect("RPC client should be available");
for i in 1..=3u64 {
let raw_tx =
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), i - 1)
.await;
let tx_hash = nodes[0].rpc.inject_tx(raw_tx).await?;
// Wait for tx to enter pending pool before mining
wait_for_pending_tx(&client, tx_hash).await;
let payload = nodes[0].advance_block().await?;
let block = payload.block();
assert_eq!(block.number(), i);
assert_ne!(block.hash(), B256::ZERO);
// Verify tx was actually included in the block
let receipt: Option<TransactionReceipt> =
client.request("eth_getTransactionReceipt", [tx_hash]).await?;
let receipt = receipt.expect("Receipt should exist after mining");
assert_eq!(receipt.block_number, Some(i), "Tx should be in block {i}");
}
// Verify all blocks are stored
for i in 0..=3 {
let block_hash = nodes[0].block_hash(i);
assert_ne!(block_hash, B256::ZERO);
}
Ok(())
}
/// Tx hash lookup exercises `TransactionHashNumbers` table.
#[tokio::test]
async fn test_rocksdb_transaction_queries() -> Result<()> {
reth_tracing::init_test_tracing();
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
)
.with_node_config_modifier(with_rocksdb_enabled)
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
.build()
.await?;
assert_eq!(nodes.len(), 1);
// Inject and mine a transaction
let wallets = wallet::Wallet::new(1).with_chain_id(chain_id).wallet_gen();
let signer = wallets[0].clone();
let client = nodes[0].rpc_client().expect("RPC client should be available");
let raw_tx = TransactionTestContext::transfer_tx_bytes(chain_id, signer).await;
let tx_hash = nodes[0].rpc.inject_tx(raw_tx).await?;
// Wait for tx to enter pending pool before mining
wait_for_pending_tx(&client, tx_hash).await;
let payload = nodes[0].advance_block().await?;
assert_eq!(payload.block().number(), 1);
// Query each transaction by hash
let tx: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash]).await?;
let tx = tx.expect("Transaction should be found");
assert_eq!(tx.block_number, Some(1));
let receipt: Option<TransactionReceipt> =
client.request("eth_getTransactionReceipt", [tx_hash]).await?;
let receipt = receipt.expect("Receipt should be found");
assert_eq!(receipt.block_number, Some(1));
assert!(receipt.status());
// Direct RocksDB assertion - poll with timeout since persistence is async
let tx_number = poll_tx_in_rocksdb(&nodes[0].inner.provider, tx_hash).await;
assert_eq!(tx_number, 0, "First tx should have TxNumber 0");
// Verify missing hash returns None
let missing_hash = B256::from([0xde; 32]);
let rocksdb = nodes[0].inner.provider.rocksdb_provider();
let missing_tx_number: Option<u64> =
rocksdb.get::<tables::TransactionHashNumbers>(missing_hash)?;
assert!(missing_tx_number.is_none());
let missing_tx: Option<Transaction> =
client.request("eth_getTransactionByHash", [missing_hash]).await?;
assert!(missing_tx.is_none(), "expected no transaction for missing hash");
let missing_receipt: Option<TransactionReceipt> =
client.request("eth_getTransactionReceipt", [missing_hash]).await?;
assert!(missing_receipt.is_none(), "expected no receipt for missing hash");
Ok(())
}
/// Multiple transactions in the same block are correctly persisted to `RocksDB`.
#[tokio::test]
async fn test_rocksdb_multi_tx_same_block() -> Result<()> {
reth_tracing::init_test_tracing();
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
)
.with_node_config_modifier(with_rocksdb_enabled)
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
.build()
.await?;
// Create 3 txs from the same wallet with sequential nonces
let wallets = wallet::Wallet::new(1).with_chain_id(chain_id).wallet_gen();
let signer = wallets[0].clone();
let client = nodes[0].rpc_client().expect("RPC client");
let mut tx_hashes = Vec::new();
for nonce in 0..3 {
let raw_tx =
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), nonce)
.await;
let tx_hash = nodes[0].rpc.inject_tx(raw_tx).await?;
tx_hashes.push(tx_hash);
}
// Wait for all txs to appear in pending pool before mining
for tx_hash in &tx_hashes {
wait_for_pending_tx(&client, *tx_hash).await;
}
// Mine one block containing all 3 txs
let payload = nodes[0].advance_block().await?;
assert_eq!(payload.block().number(), 1);
// Verify block contains all 3 txs
let block: Option<alloy_rpc_types_eth::Block> =
client.request("eth_getBlockByNumber", ("0x1", true)).await?;
let block = block.expect("Block 1 should exist");
assert_eq!(block.transactions.len(), 3, "Block should contain 3 txs");
// Verify each tx via RPC
for tx_hash in &tx_hashes {
let tx: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash]).await?;
let tx = tx.expect("Transaction should be found");
assert_eq!(tx.block_number, Some(1), "All txs should be in block 1");
}
// Poll RocksDB for all tx hashes and collect tx_numbers
let mut tx_numbers = Vec::new();
for tx_hash in &tx_hashes {
let n = poll_tx_in_rocksdb(&nodes[0].inner.provider, *tx_hash).await;
tx_numbers.push(n);
}
// Verify tx_numbers form the set {0, 1, 2}
tx_numbers.sort();
assert_eq!(tx_numbers, vec![0, 1, 2], "TxNumbers should be 0, 1, 2");
Ok(())
}
/// Transactions across multiple blocks have globally continuous `tx_numbers`.
#[tokio::test]
async fn test_rocksdb_txs_across_blocks() -> Result<()> {
reth_tracing::init_test_tracing();
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
)
.with_node_config_modifier(with_rocksdb_enabled)
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
.build()
.await?;
let wallets = wallet::Wallet::new(1).with_chain_id(chain_id).wallet_gen();
let signer = wallets[0].clone();
let client = nodes[0].rpc_client().expect("RPC client");
// Block 1: 2 transactions
let tx_hash_0 = nodes[0]
.rpc
.inject_tx(
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), 0).await,
)
.await?;
let tx_hash_1 = nodes[0]
.rpc
.inject_tx(
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), 1).await,
)
.await?;
// Wait for both txs to appear in pending pool
wait_for_pending_tx(&client, tx_hash_0).await;
wait_for_pending_tx(&client, tx_hash_1).await;
let payload1 = nodes[0].advance_block().await?;
assert_eq!(payload1.block().number(), 1);
// Block 2: 1 transaction
let tx_hash_2 = nodes[0]
.rpc
.inject_tx(
TransactionTestContext::transfer_tx_bytes_with_nonce(chain_id, signer.clone(), 2).await,
)
.await?;
wait_for_pending_tx(&client, tx_hash_2).await;
let payload2 = nodes[0].advance_block().await?;
assert_eq!(payload2.block().number(), 2);
// Verify block contents via RPC
let tx0: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash_0]).await?;
let tx1: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash_1]).await?;
let tx2: Option<Transaction> = client.request("eth_getTransactionByHash", [tx_hash_2]).await?;
assert_eq!(tx0.expect("tx0").block_number, Some(1));
assert_eq!(tx1.expect("tx1").block_number, Some(1));
assert_eq!(tx2.expect("tx2").block_number, Some(2));
// Poll RocksDB and verify global tx_number continuity
let all_tx_hashes = [tx_hash_0, tx_hash_1, tx_hash_2];
let mut tx_numbers = Vec::new();
for tx_hash in &all_tx_hashes {
let n = poll_tx_in_rocksdb(&nodes[0].inner.provider, *tx_hash).await;
tx_numbers.push(n);
}
// Verify they form a continuous sequence {0, 1, 2}
tx_numbers.sort();
assert_eq!(tx_numbers, vec![0, 1, 2], "TxNumbers should be globally continuous: 0, 1, 2");
// Re-query block 1 txs after block 2 is mined (regression guard)
let tx0_again: Option<Transaction> =
client.request("eth_getTransactionByHash", [tx_hash_0]).await?;
assert!(tx0_again.is_some(), "Block 1 tx should still be queryable after block 2");
Ok(())
}
/// Pending transactions should NOT appear in `RocksDB` until mined.
#[tokio::test]
async fn test_rocksdb_pending_tx_not_in_storage() -> Result<()> {
reth_tracing::init_test_tracing();
let chain_spec = test_chain_spec();
let chain_id = chain_spec.chain().id();
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
1,
chain_spec.clone(),
test_attributes_generator,
)
.with_node_config_modifier(with_rocksdb_enabled)
.with_tree_config_modifier(|config| config.with_persistence_threshold(0))
.build()
.await?;
let wallets = wallet::Wallet::new(1).with_chain_id(chain_id).wallet_gen();
let signer = wallets[0].clone();
// Inject tx but do NOT mine
let raw_tx = TransactionTestContext::transfer_tx_bytes(chain_id, signer).await;
let tx_hash = nodes[0].rpc.inject_tx(raw_tx).await?;
// Verify tx is in pending pool via RPC
let client = nodes[0].rpc_client().expect("RPC client");
wait_for_pending_tx(&client, tx_hash).await;
let pending_tx: Option<Transaction> =
client.request("eth_getTransactionByHash", [tx_hash]).await?;
assert!(pending_tx.is_some(), "Pending tx should be visible via RPC");
assert!(pending_tx.unwrap().block_number.is_none(), "Pending tx should have no block_number");
// Assert tx is NOT in RocksDB before mining (single check - tx is confirmed pending)
let rocksdb = nodes[0].inner.provider.rocksdb_provider();
let tx_number: Option<u64> = rocksdb.get::<tables::TransactionHashNumbers>(tx_hash)?;
assert!(
tx_number.is_none(),
"Pending tx should NOT be in RocksDB before mining, but found tx_number={:?}",
tx_number
);
// Now mine the block
let payload = nodes[0].advance_block().await?;
assert_eq!(payload.block().number(), 1);
// Poll until tx appears in RocksDB
let tx_number = poll_tx_in_rocksdb(&nodes[0].inner.provider, tx_hash).await;
assert_eq!(tx_number, 0, "First tx should have tx_number 0");
// Verify tx is now mined via RPC
let mined_tx: Option<Transaction> =
client.request("eth_getTransactionByHash", [tx_hash]).await?;
assert_eq!(mined_tx.expect("mined tx").block_number, Some(1));
Ok(())
}

View File

@@ -50,7 +50,17 @@ pub const DEFAULT_PREWARM_MAX_CONCURRENCY: usize = 16;
const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = EPOCH_SLOTS as u32 * 2;
const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256;
const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4;
const DEFAULT_CROSS_BLOCK_CACHE_SIZE: u64 = 4 * 1024 * 1024 * 1024;
const DEFAULT_CROSS_BLOCK_CACHE_SIZE: usize = default_cross_block_cache_size();
const fn default_cross_block_cache_size() -> usize {
if cfg!(test) {
1024 * 1024 // 1 MB in tests
} else if cfg!(target_pointer_width = "32") {
usize::MAX // max possible on wasm32 / 32-bit
} else {
4 * 1024 * 1024 * 1024 // 4 GB on 64-bit
}
}
/// Determines if the host has enough parallelism to run the payload processor.
///
@@ -100,12 +110,10 @@ pub struct TreeConfig {
disable_state_cache: bool,
/// Whether to disable parallel prewarming.
disable_prewarming: bool,
/// Whether to disable the parallel sparse trie state root algorithm.
disable_parallel_sparse_trie: bool,
/// Whether to enable state provider metrics.
state_provider_metrics: bool,
/// Cross-block cache size in bytes.
cross_block_cache_size: u64,
cross_block_cache_size: usize,
/// Whether the host has enough parallelism to run state root task.
has_enough_parallelism: bool,
/// Whether multiproof task should chunk proof targets.
@@ -158,7 +166,6 @@ impl Default for TreeConfig {
always_compare_trie_updates: false,
disable_state_cache: false,
disable_prewarming: false,
disable_parallel_sparse_trie: false,
state_provider_metrics: false,
cross_block_cache_size: DEFAULT_CROSS_BLOCK_CACHE_SIZE,
has_enough_parallelism: has_enough_parallelism(),
@@ -191,9 +198,8 @@ impl TreeConfig {
always_compare_trie_updates: bool,
disable_state_cache: bool,
disable_prewarming: bool,
disable_parallel_sparse_trie: bool,
state_provider_metrics: bool,
cross_block_cache_size: u64,
cross_block_cache_size: usize,
has_enough_parallelism: bool,
multiproof_chunking_enabled: bool,
multiproof_chunk_size: usize,
@@ -218,7 +224,6 @@ impl TreeConfig {
always_compare_trie_updates,
disable_state_cache,
disable_prewarming,
disable_parallel_sparse_trie,
state_provider_metrics,
cross_block_cache_size,
has_enough_parallelism,
@@ -299,11 +304,6 @@ impl TreeConfig {
self.state_provider_metrics
}
/// Returns whether or not the parallel sparse trie is disabled.
pub const fn disable_parallel_sparse_trie(&self) -> bool {
self.disable_parallel_sparse_trie
}
/// Returns whether or not state cache is disabled.
pub const fn disable_state_cache(&self) -> bool {
self.disable_state_cache
@@ -321,7 +321,7 @@ impl TreeConfig {
}
/// Returns the cross-block cache size.
pub const fn cross_block_cache_size(&self) -> u64 {
pub const fn cross_block_cache_size(&self) -> usize {
self.cross_block_cache_size
}
@@ -424,7 +424,7 @@ impl TreeConfig {
}
/// Setter for cross block cache size.
pub const fn with_cross_block_cache_size(mut self, cross_block_cache_size: u64) -> Self {
pub const fn with_cross_block_cache_size(mut self, cross_block_cache_size: usize) -> Self {
self.cross_block_cache_size = cross_block_cache_size;
self
}
@@ -441,15 +441,6 @@ impl TreeConfig {
self
}
/// Setter for whether to disable the parallel sparse trie
pub const fn with_disable_parallel_sparse_trie(
mut self,
disable_parallel_sparse_trie: bool,
) -> Self {
self.disable_parallel_sparse_trie = disable_parallel_sparse_trie;
self
}
/// Setter for whether multiproof task should chunk proof targets.
pub const fn with_multiproof_chunking_enabled(
mut self,

View File

@@ -53,7 +53,7 @@ revm-primitives.workspace = true
futures.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "sync", "macros"] }
mini-moka = { workspace = true, features = ["sync"] }
fixed-cache.workspace = true
moka = { workspace = true, features = ["sync"] }
smallvec.workspace = true

File diff suppressed because it is too large Load Diff

View File

@@ -22,6 +22,9 @@ pub(crate) struct EngineApiMetrics {
pub(crate) block_validation: BlockValidationMetrics,
/// Canonical chain and reorg related metrics
pub tree: TreeMetrics,
/// Metrics for EIP-7928 Block-Level Access Lists (BAL).
#[allow(dead_code)]
pub(crate) bal: BalMetrics,
}
impl EngineApiMetrics {
@@ -239,6 +242,8 @@ pub(crate) struct NewPayloadStatusMetrics {
pub(crate) new_payload_error: Counter,
/// The total gas of valid new payload messages received.
pub(crate) new_payload_total_gas: Histogram,
/// The gas used for the last valid new payload.
pub(crate) new_payload_total_gas_last: Gauge,
/// The gas per second of valid new payload messages received.
pub(crate) new_payload_gas_per_second: Histogram,
/// The gas per second for the last new payload call.
@@ -251,6 +256,8 @@ pub(crate) struct NewPayloadStatusMetrics {
pub(crate) time_between_new_payloads: Histogram,
/// Time from previous payload start to current payload start (total interval).
pub(crate) new_payload_interval: Histogram,
/// Time diff between forkchoice updated call response and the next new payload call request.
pub(crate) forkchoice_updated_new_payload_time_diff: Histogram,
}
impl NewPayloadStatusMetrics {
@@ -258,6 +265,7 @@ impl NewPayloadStatusMetrics {
pub(crate) fn update_response_metrics(
&mut self,
start: Instant,
latest_forkchoice_updated_at: &mut Option<Instant>,
result: &Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError>,
gas_used: u64,
) {
@@ -277,6 +285,7 @@ impl NewPayloadStatusMetrics {
PayloadStatusEnum::Valid => {
self.new_payload_valid.increment(1);
self.new_payload_total_gas.record(gas_used as f64);
self.new_payload_total_gas_last.set(gas_used as f64);
let gas_per_second = gas_used as f64 / elapsed.as_secs_f64();
self.new_payload_gas_per_second.record(gas_per_second);
self.new_payload_gas_per_second_last.set(gas_per_second);
@@ -290,9 +299,40 @@ impl NewPayloadStatusMetrics {
self.new_payload_messages.increment(1);
self.new_payload_latency.record(elapsed);
self.new_payload_last.set(elapsed);
if let Some(latest_forkchoice_updated_at) = latest_forkchoice_updated_at.take() {
self.forkchoice_updated_new_payload_time_diff
.record(start - latest_forkchoice_updated_at);
}
}
}
/// Metrics for EIP-7928 Block-Level Access Lists (BAL).
///
/// See also <https://github.com/ethereum/execution-metrics/issues/5>
#[allow(dead_code)]
#[derive(Metrics, Clone)]
#[metrics(scope = "execution.block_access_list")]
pub(crate) struct BalMetrics {
/// Size of the BAL in bytes for the current block.
pub(crate) size_bytes: Gauge,
/// Total number of blocks with valid BALs.
pub(crate) valid_total: Counter,
/// Total number of blocks with invalid BALs.
pub(crate) invalid_total: Counter,
/// Time taken to validate the BAL against actual execution.
pub(crate) validation_time_seconds: Histogram,
/// Number of account changes in the BAL.
pub(crate) account_changes: Gauge,
/// Number of storage changes in the BAL.
pub(crate) storage_changes: Gauge,
/// Number of balance changes in the BAL.
pub(crate) balance_changes: Gauge,
/// Number of nonce changes in the BAL.
pub(crate) nonce_changes: Gauge,
/// Number of code changes in the BAL.
pub(crate) code_changes: Gauge,
}
/// Metrics for non-execution related block validation.
#[derive(Metrics, Clone)]
#[metrics(scope = "sync.block_validation")]
@@ -301,6 +341,8 @@ pub(crate) struct BlockValidationMetrics {
pub(crate) state_root_storage_tries_updated_total: Counter,
/// Total number of times the parallel state root computation fell back to regular.
pub(crate) state_root_parallel_fallback_total: Counter,
/// Total number of times the state root task failed but the fallback succeeded.
pub(crate) state_root_task_fallback_success_total: Counter,
/// Latest state root duration, ie the time spent blocked waiting for the state root.
pub(crate) state_root_duration: Gauge,
/// Histogram for state root duration ie the time spent blocked waiting for the state root

View File

@@ -1492,6 +1492,10 @@ where
self.on_maybe_tree_event(res.event.take())?;
}
if let Err(ref err) = output {
error!(target: "engine::tree", %err, ?state, "Error processing forkchoice update");
}
self.metrics.engine.forkchoice_updated.update_response_metrics(
start,
&mut self.metrics.engine.new_payload.latest_finish_at,
@@ -1514,10 +1518,12 @@ where
let gas_used = payload.gas_used();
let num_hash = payload.num_hash();
let mut output = self.on_new_payload(payload);
self.metrics
.engine
.new_payload
.update_response_metrics(start, &output, gas_used);
self.metrics.engine.new_payload.update_response_metrics(
start,
&mut self.metrics.engine.forkchoice_updated.latest_finish_at,
&output,
gas_used,
);
let maybe_event =
output.as_mut().ok().and_then(|out| out.event.take());

View File

@@ -1,188 +0,0 @@
//! Configured sparse trie enum for switching between serial and parallel implementations.
use alloy_primitives::B256;
use reth_trie::{BranchNodeMasks, Nibbles, ProofTrieNode, TrieNode};
use reth_trie_sparse::{
errors::SparseTrieResult, provider::TrieNodeProvider, LeafLookup, LeafLookupError,
SerialSparseTrie, SparseTrieInterface, SparseTrieUpdates,
};
use reth_trie_sparse_parallel::ParallelSparseTrie;
use std::borrow::Cow;
/// Enum for switching between serial and parallel sparse trie implementations.
///
/// This type allows runtime selection between different sparse trie implementations,
/// providing flexibility in choosing the appropriate implementation based on workload
/// characteristics.
#[derive(Debug, Clone)]
pub(crate) enum ConfiguredSparseTrie {
/// Serial implementation of the sparse trie.
Serial(Box<SerialSparseTrie>),
/// Parallel implementation of the sparse trie.
Parallel(Box<ParallelSparseTrie>),
}
impl From<SerialSparseTrie> for ConfiguredSparseTrie {
fn from(trie: SerialSparseTrie) -> Self {
Self::Serial(Box::new(trie))
}
}
impl From<ParallelSparseTrie> for ConfiguredSparseTrie {
fn from(trie: ParallelSparseTrie) -> Self {
Self::Parallel(Box::new(trie))
}
}
impl Default for ConfiguredSparseTrie {
fn default() -> Self {
Self::Serial(Default::default())
}
}
impl SparseTrieInterface for ConfiguredSparseTrie {
fn with_root(
self,
root: TrieNode,
masks: Option<BranchNodeMasks>,
retain_updates: bool,
) -> SparseTrieResult<Self> {
match self {
Self::Serial(trie) => {
trie.with_root(root, masks, retain_updates).map(|t| Self::Serial(Box::new(t)))
}
Self::Parallel(trie) => {
trie.with_root(root, masks, retain_updates).map(|t| Self::Parallel(Box::new(t)))
}
}
}
fn with_updates(self, retain_updates: bool) -> Self {
match self {
Self::Serial(trie) => Self::Serial(Box::new(trie.with_updates(retain_updates))),
Self::Parallel(trie) => Self::Parallel(Box::new(trie.with_updates(retain_updates))),
}
}
fn reserve_nodes(&mut self, additional: usize) {
match self {
Self::Serial(trie) => trie.reserve_nodes(additional),
Self::Parallel(trie) => trie.reserve_nodes(additional),
}
}
fn reveal_node(
&mut self,
path: Nibbles,
node: TrieNode,
masks: Option<BranchNodeMasks>,
) -> SparseTrieResult<()> {
match self {
Self::Serial(trie) => trie.reveal_node(path, node, masks),
Self::Parallel(trie) => trie.reveal_node(path, node, masks),
}
}
fn reveal_nodes(&mut self, nodes: Vec<ProofTrieNode>) -> SparseTrieResult<()> {
match self {
Self::Serial(trie) => trie.reveal_nodes(nodes),
Self::Parallel(trie) => trie.reveal_nodes(nodes),
}
}
fn update_leaf<P: TrieNodeProvider>(
&mut self,
full_path: Nibbles,
value: Vec<u8>,
provider: P,
) -> SparseTrieResult<()> {
match self {
Self::Serial(trie) => trie.update_leaf(full_path, value, provider),
Self::Parallel(trie) => trie.update_leaf(full_path, value, provider),
}
}
fn remove_leaf<P: TrieNodeProvider>(
&mut self,
full_path: &Nibbles,
provider: P,
) -> SparseTrieResult<()> {
match self {
Self::Serial(trie) => trie.remove_leaf(full_path, provider),
Self::Parallel(trie) => trie.remove_leaf(full_path, provider),
}
}
fn root(&mut self) -> B256 {
match self {
Self::Serial(trie) => trie.root(),
Self::Parallel(trie) => trie.root(),
}
}
fn update_subtrie_hashes(&mut self) {
match self {
Self::Serial(trie) => trie.update_subtrie_hashes(),
Self::Parallel(trie) => trie.update_subtrie_hashes(),
}
}
fn get_leaf_value(&self, full_path: &Nibbles) -> Option<&Vec<u8>> {
match self {
Self::Serial(trie) => trie.get_leaf_value(full_path),
Self::Parallel(trie) => trie.get_leaf_value(full_path),
}
}
fn find_leaf(
&self,
full_path: &Nibbles,
expected_value: Option<&Vec<u8>>,
) -> Result<LeafLookup, LeafLookupError> {
match self {
Self::Serial(trie) => trie.find_leaf(full_path, expected_value),
Self::Parallel(trie) => trie.find_leaf(full_path, expected_value),
}
}
fn take_updates(&mut self) -> SparseTrieUpdates {
match self {
Self::Serial(trie) => trie.take_updates(),
Self::Parallel(trie) => trie.take_updates(),
}
}
fn wipe(&mut self) {
match self {
Self::Serial(trie) => trie.wipe(),
Self::Parallel(trie) => trie.wipe(),
}
}
fn clear(&mut self) {
match self {
Self::Serial(trie) => trie.clear(),
Self::Parallel(trie) => trie.clear(),
}
}
fn updates_ref(&self) -> Cow<'_, SparseTrieUpdates> {
match self {
Self::Serial(trie) => trie.updates_ref(),
Self::Parallel(trie) => trie.updates_ref(),
}
}
fn shrink_nodes_to(&mut self, size: usize) {
match self {
Self::Serial(trie) => trie.shrink_nodes_to(size),
Self::Parallel(trie) => trie.shrink_nodes_to(size),
}
}
fn shrink_values_to(&mut self, size: usize) {
match self {
Self::Serial(trie) => trie.shrink_values_to(size),
Self::Parallel(trie) => trie.shrink_values_to(size),
}
}
}

View File

@@ -2,10 +2,7 @@
use super::precompile_cache::PrecompileCacheMap;
use crate::tree::{
cached_state::{
CachedStateMetrics, CachedStateProvider, ExecutionCache as StateExecutionCache,
ExecutionCacheBuilder, SavedCache,
},
cached_state::{CachedStateMetrics, CachedStateProvider, ExecutionCache, SavedCache},
payload_processor::{
prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
sparse_trie::StateRootComputeOutcome,
@@ -44,7 +41,7 @@ use reth_trie_parallel::{
};
use reth_trie_sparse::{
provider::{TrieNodeProvider, TrieNodeProviderFactory},
ClearedSparseStateTrie, SparseStateTrie, SparseTrie,
ClearedSparseStateTrie, RevealableSparseTrie, SparseStateTrie,
};
use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds};
use std::{
@@ -60,15 +57,12 @@ use std::{
use tracing::{debug, debug_span, instrument, warn, Span};
pub mod bal;
mod configured_sparse_trie;
pub mod executor;
pub mod multiproof;
pub mod prewarm;
pub mod receipt_root_task;
pub mod sparse_trie;
use configured_sparse_trie::ConfiguredSparseTrie;
/// Default parallelism thresholds to use with the [`ParallelSparseTrie`].
///
/// These values were determined by performing benchmarks using gradually increasing values to judge
@@ -116,11 +110,11 @@ where
/// The executor used by to spawn tasks.
executor: WorkloadExecutor,
/// The most recent cache used for execution.
execution_cache: ExecutionCache,
execution_cache: PayloadExecutionCache,
/// Metrics for trie operations
trie_metrics: MultiProofTaskMetrics,
/// Cross-block cache size in bytes.
cross_block_cache_size: u64,
cross_block_cache_size: usize,
/// Whether transactions should not be executed on prewarming task.
disable_transaction_prewarming: bool,
/// Whether state cache should be disable
@@ -134,12 +128,8 @@ where
/// A cleared `SparseStateTrie`, kept around to be reused for the state root computation so
/// that allocations can be minimized.
sparse_state_trie: Arc<
parking_lot::Mutex<
Option<ClearedSparseStateTrie<ConfiguredSparseTrie, ConfiguredSparseTrie>>,
>,
parking_lot::Mutex<Option<ClearedSparseStateTrie<ParallelSparseTrie, ParallelSparseTrie>>>,
>,
/// Whether to disable the parallel sparse trie.
disable_parallel_sparse_trie: bool,
/// Maximum concurrency for prewarm task.
prewarm_max_concurrency: usize,
/// Whether to disable cache metrics recording.
@@ -174,7 +164,6 @@ where
precompile_cache_disabled: config.precompile_cache_disabled(),
precompile_cache_map,
sparse_state_trie: Arc::default(),
disable_parallel_sparse_trie: config.disable_parallel_sparse_trie(),
prewarm_max_concurrency: config.prewarm_max_concurrency(),
disable_cache_metrics: config.disable_cache_metrics(),
}
@@ -313,7 +302,7 @@ where
// Build a state provider for the multiproof task
let provider = provider_builder.build().expect("failed to build provider");
let provider = if let Some(saved_cache) = saved_cache {
let (cache, metrics, _) = saved_cache.split();
let (cache, metrics, _disable_metrics) = saved_cache.split();
Box::new(CachedStateProvider::new(provider, cache, metrics))
as Box<dyn StateProvider>
} else {
@@ -495,8 +484,11 @@ where
cache
} else {
debug!("creating new execution cache on cache miss");
let cache = ExecutionCacheBuilder::default().build_caches(self.cross_block_cache_size);
SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed())
let start = Instant::now();
let cache = ExecutionCache::new(self.cross_block_cache_size);
let metrics = CachedStateMetrics::zeroed();
metrics.record_cache_creation(start.elapsed());
SavedCache::new(parent_hash, cache, metrics)
.with_disable_cache_metrics(self.disable_cache_metrics)
}
}
@@ -514,7 +506,6 @@ where
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
{
let cleared_sparse_trie = Arc::clone(&self.sparse_state_trie);
let disable_parallel_sparse_trie = self.disable_parallel_sparse_trie;
let trie_metrics = self.trie_metrics.clone();
let span = Span::current();
@@ -524,14 +515,10 @@ where
// Reuse a stored SparseStateTrie, or create a new one using the desired configuration
// if there's none to reuse.
let sparse_state_trie = cleared_sparse_trie.lock().take().unwrap_or_else(|| {
let default_trie = SparseTrie::blind_from(if disable_parallel_sparse_trie {
ConfiguredSparseTrie::Serial(Default::default())
} else {
ConfiguredSparseTrie::Parallel(Box::new(
ParallelSparseTrie::default()
.with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
))
});
let default_trie = RevealableSparseTrie::blind_from(
ParallelSparseTrie::default()
.with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
);
ClearedSparseStateTrie::from_state_trie(
SparseStateTrie::new()
.with_accounts_trie(default_trie.clone())
@@ -540,12 +527,13 @@ where
)
});
let task = SparseTrieTask::<_, ConfiguredSparseTrie, ConfiguredSparseTrie>::new_with_cleared_trie(
sparse_trie_rx,
proof_worker_handle,
trie_metrics,
sparse_state_trie,
);
let task =
SparseTrieTask::<_, ParallelSparseTrie, ParallelSparseTrie>::new_with_cleared_trie(
sparse_trie_rx,
proof_worker_handle,
trie_metrics,
sparse_state_trie,
);
let (result, trie) = task.run();
// Send state root computation result
@@ -587,28 +575,27 @@ where
parent_hash = %block_with_parent.parent,
"Cannot find cache for parent hash, skip updating cache with new state for inserted executed block",
);
return;
return
}
// Take existing cache (if any) or create fresh caches
let (caches, cache_metrics) = match cached.take() {
Some(existing) => {
let (c, m, _) = existing.split();
(c, m)
}
let (caches, cache_metrics, _) = match cached.take() {
Some(existing) => existing.split(),
None => (
ExecutionCacheBuilder::default().build_caches(self.cross_block_cache_size),
ExecutionCache::new(self.cross_block_cache_size),
CachedStateMetrics::zeroed(),
false,
),
};
// Insert the block's bundle state into cache
let new_cache = SavedCache::new(block_with_parent.block.hash, caches, cache_metrics)
.with_disable_cache_metrics(disable_cache_metrics);
let new_cache =
SavedCache::new(block_with_parent.block.hash, caches, cache_metrics)
.with_disable_cache_metrics(disable_cache_metrics);
if new_cache.cache().insert_state(bundle_state).is_err() {
*cached = None;
debug!(target: "engine::caching", "cleared execution cache on update error");
return;
return
}
new_cache.update_metrics();
@@ -672,7 +659,7 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
}
/// Returns a clone of the caches used by prewarming
pub(super) fn caches(&self) -> Option<StateExecutionCache> {
pub(super) fn caches(&self) -> Option<ExecutionCache> {
self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.cache().clone())
}
@@ -776,29 +763,29 @@ impl<R> Drop for CacheTaskHandle<R> {
/// ## Cache Safety
///
/// **CRITICAL**: Cache update operations require exclusive access. All concurrent cache users
/// (such as prewarming tasks) must be terminated before calling `update_with_guard`, otherwise
/// the cache may be corrupted or cleared.
/// (such as prewarming tasks) must be terminated before calling
/// [`PayloadExecutionCache::update_with_guard`], otherwise the cache may be corrupted or cleared.
///
/// ## Cache vs Prewarming Distinction
///
/// **`ExecutionCache`**:
/// **[`PayloadExecutionCache`]**:
/// - Stores parent block's execution state after completion
/// - Used to fetch parent data for next block's execution
/// - Must be exclusively accessed during save operations
///
/// **`PrewarmCacheTask`**:
/// **[`PrewarmCacheTask`]**:
/// - Speculatively loads accounts/storage that might be used in transaction execution
/// - Prepares data for state root proof computation
/// - Runs concurrently but must not interfere with cache saves
#[derive(Clone, Debug, Default)]
struct ExecutionCache {
struct PayloadExecutionCache {
/// Guarded cloneable cache identified by a block hash.
inner: Arc<RwLock<Option<SavedCache>>>,
/// Metrics for cache operations.
metrics: ExecutionCacheMetrics,
}
impl ExecutionCache {
impl PayloadExecutionCache {
/// Returns the cache for `parent_hash` if it's available for use.
///
/// A cache is considered available when:
@@ -834,11 +821,15 @@ impl ExecutionCache {
"Existing cache found"
);
if hash_matches && available {
return Some(c.clone());
}
if hash_matches && !available {
if available {
// If the has is available (no other threads are using it), but has a mismatching
// parent hash, we can just clear it and keep using without re-creating from
// scratch.
if !hash_matches {
c.clear();
}
return Some(c.clone())
} else if hash_matches {
self.metrics.execution_cache_in_use.increment(1);
}
} else {
@@ -911,9 +902,9 @@ where
#[cfg(test)]
mod tests {
use super::ExecutionCache;
use super::PayloadExecutionCache;
use crate::tree::{
cached_state::{CachedStateMetrics, ExecutionCacheBuilder, SavedCache},
cached_state::{CachedStateMetrics, ExecutionCache, SavedCache},
payload_processor::{
evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
},
@@ -943,13 +934,13 @@ mod tests {
use std::sync::Arc;
fn make_saved_cache(hash: B256) -> SavedCache {
let execution_cache = ExecutionCacheBuilder::default().build_caches(1_000);
let execution_cache = ExecutionCache::new(1_000);
SavedCache::new(hash, execution_cache, CachedStateMetrics::zeroed())
}
#[test]
fn execution_cache_allows_single_checkout() {
let execution_cache = ExecutionCache::default();
let execution_cache = PayloadExecutionCache::default();
let hash = B256::from([1u8; 32]);
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
@@ -968,7 +959,7 @@ mod tests {
#[test]
fn execution_cache_checkout_releases_on_drop() {
let execution_cache = ExecutionCache::default();
let execution_cache = PayloadExecutionCache::default();
let hash = B256::from([2u8; 32]);
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
@@ -984,19 +975,21 @@ mod tests {
}
#[test]
fn execution_cache_mismatch_parent_returns_none() {
let execution_cache = ExecutionCache::default();
fn execution_cache_mismatch_parent_clears_and_returns() {
let execution_cache = PayloadExecutionCache::default();
let hash = B256::from([3u8; 32]);
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
let miss = execution_cache.get_cache_for(B256::from([4u8; 32]));
assert!(miss.is_none(), "checkout should fail for different parent hash");
// When the parent hash doesn't match, the cache is cleared and returned for reuse
let different_hash = B256::from([4u8; 32]);
let cache = execution_cache.get_cache_for(different_hash);
assert!(cache.is_some(), "cache should be returned for reuse after clearing")
}
#[test]
fn execution_cache_update_after_release_succeeds() {
let execution_cache = ExecutionCache::default();
let execution_cache = PayloadExecutionCache::default();
let initial = B256::from([5u8; 32]);
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(initial)));

View File

@@ -777,10 +777,21 @@ impl MultiProofTask {
// [`MultiAddedRemovedKeys`]. Even if there are not any known removed keys for the account,
// we still want to optimistically fetch extension children for the leaf addition case.
// V2 multiproofs don't need this.
//
// Only clone the AddedRemovedKeys for accounts in the targets, not the entire accumulated
// set, to avoid O(n) cloning with many buffered blocks.
let multi_added_removed_keys =
if let VersionedMultiProofTargets::Legacy(legacy_targets) = &targets {
self.multi_added_removed_keys.touch_accounts(legacy_targets.keys().copied());
Some(Arc::new(self.multi_added_removed_keys.clone()))
Some(Arc::new(MultiAddedRemovedKeys {
account: self.multi_added_removed_keys.account.clone(),
storages: legacy_targets
.keys()
.filter_map(|k| {
self.multi_added_removed_keys.storages.get(k).map(|v| (*k, v.clone()))
})
.collect(),
}))
} else {
None
};
@@ -1516,8 +1527,9 @@ where
#[cfg(test)]
mod tests {
use crate::tree::cached_state::CachedStateProvider;
use super::*;
use crate::tree::cached_state::{CachedStateProvider, ExecutionCacheBuilder};
use alloy_eip7928::{AccountChanges, BalanceChange};
use alloy_primitives::Address;
use reth_provider::{
@@ -1577,7 +1589,7 @@ mod tests {
{
let db_provider = factory.database_provider_ro().unwrap();
let state_provider: StateProviderBox = Box::new(LatestStateProvider::new(db_provider));
let cache = ExecutionCacheBuilder::default().build_caches(1000);
let cache = crate::tree::cached_state::ExecutionCache::new(1000);
CachedStateProvider::new(state_provider, cache, Default::default())
}

View File

@@ -17,17 +17,16 @@ use crate::tree::{
bal::{total_slots, BALSlotIter},
executor::WorkloadExecutor,
multiproof::{MultiProofMessage, VersionedMultiProofTargets},
ExecutionCache as PayloadExecutionCache,
PayloadExecutionCache,
},
precompile_cache::{CachedPrecompile, PrecompileCacheMap},
ExecutionEnv, StateProviderBuilder,
};
use alloy_consensus::transaction::TxHashRef;
use alloy_eip7928::BlockAccessList;
use alloy_eips::Typed2718;
use alloy_evm::Database;
use alloy_primitives::{keccak256, map::B256Set, B256};
use crossbeam_channel::Sender as CrossbeamSender;
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use metrics::{Counter, Gauge, Histogram};
use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, RecoveredTx, SpecFor};
use reth_metrics::Metrics;
@@ -66,19 +65,6 @@ struct IndexedTransaction<Tx> {
tx: Tx,
}
/// Maximum standard Ethereum transaction type value.
///
/// Standard transaction types are:
/// - Type 0: Legacy transactions (original Ethereum)
/// - Type 1: EIP-2930 (access list transactions)
/// - Type 2: EIP-1559 (dynamic fee transactions)
/// - Type 3: EIP-4844 (blob transactions)
/// - Type 4: EIP-7702 (set code authorization transactions)
///
/// Any transaction with a type > 4 is considered a non-standard/system transaction,
/// typically used by L2s for special purposes (e.g., Optimism deposit transactions use type 126).
const MAX_STANDARD_TX_TYPE: u8 = 4;
/// A task that is responsible for caching and prewarming the cache by executing transactions
/// individually in parallel.
///
@@ -177,8 +163,8 @@ where
transaction_count_hint.min(max_concurrency)
};
// Initialize worker handles container
let handles = ctx.clone().spawn_workers(workers_needed, &executor, actions_tx.clone(), done_tx.clone());
// Spawn workers
let tx_sender = ctx.clone().spawn_workers(workers_needed, &executor, actions_tx.clone(), done_tx.clone());
// Distribute transactions to workers
let mut tx_index = 0usize;
@@ -193,37 +179,18 @@ where
}
let indexed_tx = IndexedTransaction { index: tx_index, tx };
let is_system_tx = indexed_tx.tx.tx().ty() > MAX_STANDARD_TX_TYPE;
// System transactions (type > 4) in the first position set critical metadata
// that affects all subsequent transactions (e.g., L1 block info on L2s).
// Broadcast the first system transaction to all workers to ensure they have
// the critical state. This is particularly important for L2s like Optimism
// where the first deposit transaction (type 126) contains essential block metadata.
if tx_index == 0 && is_system_tx {
for handle in &handles {
// Ignore send errors: workers listen to terminate_execution and may
// exit early when signaled. Sending to a disconnected worker is
// possible and harmless and should happen at most once due to
// the terminate_execution check above.
let _ = handle.send(indexed_tx.clone());
}
} else {
// Round-robin distribution for all other transactions
let worker_idx = tx_index % workers_needed;
// Ignore send errors: workers listen to terminate_execution and may
// exit early when signaled. Sending to a disconnected worker is
// possible and harmless and should happen at most once due to
// the terminate_execution check above.
let _ = handles[worker_idx].send(indexed_tx);
}
// Send transaction to the workers
// Ignore send errors: workers listen to terminate_execution and may
// exit early when signaled.
let _ = tx_sender.send(indexed_tx);
tx_index += 1;
}
// drop handle and wait for all tasks to finish and drop theirs
// drop sender and wait for all tasks to finish
drop(done_tx);
drop(handles);
drop(tx_sender);
while done_rx.recv().is_ok() {}
let _ = actions_tx
@@ -562,7 +529,7 @@ where
Some((evm, metrics, terminate_execution, v2_proofs_enabled))
}
/// Accepts an [`mpsc::Receiver`] of transactions and a handle to prewarm task. Executes
/// Accepts a [`CrossbeamReceiver`] of transactions and a handle to prewarm task. Executes
/// transactions and streams [`PrewarmTaskEvent::Outcome`] messages for each transaction.
///
/// This function processes transactions sequentially from the receiver and emits outcome events
@@ -574,7 +541,7 @@ where
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn transact_batch<Tx>(
self,
txs: mpsc::Receiver<IndexedTransaction<Tx>>,
txs: CrossbeamReceiver<IndexedTransaction<Tx>>,
sender: Sender<PrewarmTaskEvent<N::Receipt>>,
done_tx: Sender<()>,
) where
@@ -661,35 +628,31 @@ where
let _ = done_tx.send(());
}
/// Spawns a worker task for transaction execution and returns its sender channel.
/// Spawns worker tasks that pull transactions from a shared channel.
///
/// Returns the sender for distributing transactions to workers.
fn spawn_workers<Tx>(
self,
workers_needed: usize,
task_executor: &WorkloadExecutor,
actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
done_tx: Sender<()>,
) -> Vec<mpsc::Sender<IndexedTransaction<Tx>>>
) -> CrossbeamSender<IndexedTransaction<Tx>>
where
Tx: ExecutableTxFor<Evm> + Send + 'static,
{
let mut handles = Vec::with_capacity(workers_needed);
let mut receivers = Vec::with_capacity(workers_needed);
let (tx_sender, tx_receiver) = crossbeam_channel::unbounded();
for _ in 0..workers_needed {
let (tx, rx) = mpsc::channel();
handles.push(tx);
receivers.push(rx);
}
// Spawn a separate task spawning workers in parallel.
// Spawn workers that all pull from the shared receiver
let executor = task_executor.clone();
let span = Span::current();
task_executor.spawn_blocking(move || {
let _enter = span.entered();
for (idx, rx) in receivers.into_iter().enumerate() {
for idx in 0..workers_needed {
let ctx = self.clone();
let actions_tx = actions_tx.clone();
let done_tx = done_tx.clone();
let rx = tx_receiver.clone();
let span = debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx);
executor.spawn_blocking(move || {
let _enter = span.entered();
@@ -698,7 +661,7 @@ where
}
});
handles
tx_sender
}
/// Spawns a worker task for BAL slot prefetching.

View File

@@ -8,7 +8,7 @@ use reth_trie_parallel::{proof_task::ProofResult, root::ParallelStateRootError};
use reth_trie_sparse::{
errors::{SparseStateTrieResult, SparseTrieErrorKind},
provider::{TrieNodeProvider, TrieNodeProviderFactory},
ClearedSparseStateTrie, SerialSparseTrie, SparseStateTrie, SparseTrieInterface,
ClearedSparseStateTrie, SerialSparseTrie, SparseStateTrie, SparseTrie,
};
use smallvec::SmallVec;
use std::{
@@ -38,8 +38,8 @@ where
BPF: TrieNodeProviderFactory + Send + Sync + Clone,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrieInterface + Send + Sync + Default,
S: SparseTrieInterface + Send + Sync + Default + Clone,
A: SparseTrie + Send + Sync + Default,
S: SparseTrie + Send + Sync + Default + Clone,
{
/// Creates a new sparse trie, pre-populating with a [`ClearedSparseStateTrie`].
pub(super) fn new_with_cleared_trie(
@@ -150,8 +150,8 @@ where
BPF: TrieNodeProviderFactory + Send + Sync,
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrieInterface + Send + Sync + Default,
S: SparseTrieInterface + Send + Sync + Default + Clone,
A: SparseTrie + Send + Sync + Default,
S: SparseTrie + Send + Sync + Default + Clone,
{
trace!(target: "engine::root::sparse", "Updating sparse trie");
let started_at = Instant::now();

View File

@@ -503,6 +503,7 @@ where
let root_time = Instant::now();
let mut maybe_state_root = None;
let mut state_root_task_failed = false;
match strategy {
StateRootStrategy::StateRootTask => {
@@ -521,10 +522,12 @@ where
block_state_root = ?block.header().state_root(),
"State root task returned incorrect state root"
);
state_root_task_failed = true;
}
}
Err(error) => {
debug!(target: "engine::tree::payload_validator", %error, "State root task failed");
state_root_task_failed = true;
}
}
}
@@ -569,6 +572,11 @@ where
self.compute_state_root_serial(overlay_factory.clone(), &hashed_state),
block
);
if state_root_task_failed {
self.metrics.block_validation.state_root_task_fallback_success_total.increment(1);
}
(root, updates, root_time.elapsed())
};

View File

@@ -52,7 +52,7 @@ pub fn read_dir(
checksums.next().transpose()?.ok_or_eyre("Got less checksums than ERA files")?;
}
entries.sort_by(|(left, _), (right, _)| left.cmp(right));
entries.sort_by_key(|(left, _)| *left);
Ok(stream::iter(entries.into_iter().skip_while(move |(n, _)| *n < start_index).map(
move |(_, path)| {

View File

@@ -20,6 +20,7 @@ use reth_era::{
},
};
use reth_fs_util as fs;
use reth_primitives_traits::Block;
use reth_storage_api::{BlockNumReader, BlockReader, HeaderProvider};
use std::{
path::PathBuf,
@@ -295,9 +296,11 @@ where
return Err(eyre!("Expected block {expected_block_number}, got {actual_block_number}"));
}
// CompressedBody must contain the block *body* (rlp(body)), not the full block (rlp(block)).
let body = provider
.block_by_number(actual_block_number)?
.ok_or_else(|| eyre!("Block body not found for block {}", actual_block_number))?;
.ok_or_else(|| eyre!("Block not found for block {}", actual_block_number))?
.into_body();
let receipts = provider
.receipts_by_block(actual_block_number.into())?

View File

@@ -59,6 +59,7 @@ std = [
"reth-storage-errors/std",
]
test-utils = [
"std",
"dep:parking_lot",
"dep:derive_more",
"reth-chainspec/test-utils",

View File

@@ -18,7 +18,7 @@ use reth_evm::{
};
use reth_network::{primitives::BasicNetworkPrimitives, NetworkHandle, PeersInfo};
use reth_node_api::{
AddOnsContext, BlockTy, FullNodeComponents, HeaderTy, NodeAddOns, NodePrimitives,
AddOnsContext, FullNodeComponents, HeaderTy, NodeAddOns, NodePrimitives,
PayloadAttributesBuilder, PrimitivesTy, TxTy,
};
use reth_node_builder::{
@@ -53,8 +53,8 @@ use reth_rpc_eth_types::{error::FromEvmError, EthApiError};
use reth_rpc_server_types::RethRpcModule;
use reth_tracing::tracing::{debug, info};
use reth_transaction_pool::{
blobstore::DiskFileBlobStore, EthPooledTransaction, EthTransactionPool, PoolPooledTx,
PoolTransaction, TransactionPool, TransactionValidationTaskExecutor,
blobstore::DiskFileBlobStore, EthTransactionPool, PoolPooledTx, PoolTransaction,
TransactionPool, TransactionValidationTaskExecutor,
};
use revm::context::TxEnv;
use std::{marker::PhantomData, sync::Arc, time::SystemTime};
@@ -456,18 +456,22 @@ pub struct EthereumPoolBuilder {
// TODO add options for txpool args
}
impl<Types, Node> PoolBuilder<Node> for EthereumPoolBuilder
impl<Types, Node, Evm> PoolBuilder<Node, Evm> for EthereumPoolBuilder
where
Types: NodeTypes<
ChainSpec: EthereumHardforks,
Primitives: NodePrimitives<SignedTx = TransactionSigned>,
>,
Node: FullNodeTypes<Types = Types>,
Evm: ConfigureEvm<Primitives = PrimitivesTy<Types>> + Clone + 'static,
{
type Pool =
EthTransactionPool<Node::Provider, DiskFileBlobStore, EthPooledTransaction, BlockTy<Types>>;
type Pool = EthTransactionPool<Node::Provider, DiskFileBlobStore, Evm>;
async fn build_pool(self, ctx: &BuilderContext<Node>) -> eyre::Result<Self::Pool> {
async fn build_pool(
self,
ctx: &BuilderContext<Node>,
evm_config: Evm,
) -> eyre::Result<Self::Pool> {
let pool_config = ctx.pool_config();
let blobs_disabled = ctx.config().txpool.disable_blobs_support ||
@@ -493,17 +497,17 @@ where
let blob_store =
reth_node_builder::components::create_blob_store_with_cache(ctx, blob_cache_size)?;
let validator = TransactionValidationTaskExecutor::eth_builder(ctx.provider().clone())
.with_head_timestamp(ctx.head().timestamp)
.set_eip4844(!blobs_disabled)
.kzg_settings(ctx.kzg_settings()?)
.with_max_tx_input_bytes(ctx.config().txpool.max_tx_input_bytes)
.with_local_transactions_config(pool_config.local_transactions_config.clone())
.set_tx_fee_cap(ctx.config().rpc.rpc_tx_fee_cap)
.with_max_tx_gas_limit(ctx.config().txpool.max_tx_gas_limit)
.with_minimum_priority_fee(ctx.config().txpool.minimum_priority_fee)
.with_additional_tasks(ctx.config().txpool.additional_validation_tasks)
.build_with_tasks(ctx.task_executor().clone(), blob_store.clone());
let validator =
TransactionValidationTaskExecutor::eth_builder(ctx.provider().clone(), evm_config)
.set_eip4844(!blobs_disabled)
.kzg_settings(ctx.kzg_settings()?)
.with_max_tx_input_bytes(ctx.config().txpool.max_tx_input_bytes)
.with_local_transactions_config(pool_config.local_transactions_config.clone())
.set_tx_fee_cap(ctx.config().rpc.rpc_tx_fee_cap)
.with_max_tx_gas_limit(ctx.config().txpool.max_tx_gas_limit)
.with_minimum_priority_fee(ctx.config().txpool.minimum_priority_fee)
.with_additional_tasks(ctx.config().txpool.additional_validation_tasks)
.build_with_tasks(ctx.task_executor().clone(), blob_store.clone());
if validator.validator().eip4844() {
// initializing the KZG settings can be expensive, this should be done upfront so that

View File

@@ -9,6 +9,7 @@ mod p2p;
mod pool;
mod prestate;
mod rpc;
mod selfdestruct;
mod utils;
const fn main() {}

View File

@@ -0,0 +1,529 @@
//! E2E tests for SELFDESTRUCT behavior and output state verification.
//!
//! These tests verify that:
//! - Pre-Dencun: SELFDESTRUCT clears storage and code, output state reflects this
//! - Post-Dencun (EIP-6780): SELFDESTRUCT only works in same-tx creation, state persists
//!
//! We disable prewarming to ensure deterministic cache behavior and verify the execution
//! output state contains the expected account status after SELFDESTRUCT.
use crate::utils::{eth_payload_attributes, eth_payload_attributes_shanghai};
use alloy_network::{EthereumWallet, TransactionBuilder};
use alloy_primitives::{bytes, Address, Bytes, TxKind, U256};
use alloy_provider::{Provider, ProviderBuilder};
use alloy_rpc_types_eth::TransactionRequest;
use futures::StreamExt;
use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
use reth_e2e_test_utils::setup_engine;
use reth_node_api::TreeConfig;
use reth_node_ethereum::EthereumNode;
use reth_revm::db::BundleAccount;
use std::sync::Arc;
const MAX_FEE_PER_GAS: u128 = 20_000_000_000;
const MAX_PRIORITY_FEE_PER_GAS: u128 = 1_000_000_000;
fn cancun_spec() -> Arc<ChainSpec> {
Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(serde_json::from_str(include_str!("../assets/genesis.json")).unwrap())
.cancun_activated()
.build(),
)
}
fn shanghai_spec() -> Arc<ChainSpec> {
Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(serde_json::from_str(include_str!("../assets/genesis.json")).unwrap())
.shanghai_activated()
.build(),
)
}
fn deploy_tx(from: Address, nonce: u64, init_code: Bytes) -> TransactionRequest {
TransactionRequest::default()
.with_from(from)
.with_nonce(nonce)
.with_gas_limit(500_000)
.with_max_fee_per_gas(MAX_FEE_PER_GAS)
.with_max_priority_fee_per_gas(MAX_PRIORITY_FEE_PER_GAS)
.with_input(init_code)
.with_kind(TxKind::Create)
}
fn call_tx(from: Address, to: Address, nonce: u64) -> TransactionRequest {
TransactionRequest::default()
.with_from(from)
.with_to(to)
.with_nonce(nonce)
.with_gas_limit(100_000)
.with_max_fee_per_gas(MAX_FEE_PER_GAS)
.with_max_priority_fee_per_gas(MAX_PRIORITY_FEE_PER_GAS)
}
fn transfer_tx(from: Address, to: Address, nonce: u64, value: U256) -> TransactionRequest {
TransactionRequest::default()
.with_from(from)
.with_to(to)
.with_nonce(nonce)
.with_value(value)
.with_gas_limit(21_000)
.with_max_fee_per_gas(MAX_FEE_PER_GAS)
.with_max_priority_fee_per_gas(MAX_PRIORITY_FEE_PER_GAS)
}
/// Creates init code for a contract that selfdestructs during deployment (same tx).
/// This tests the EIP-6780 exception where SELFDESTRUCT in same tx as creation still works.
///
/// The contract:
/// 1. Stores 0x42 at slot 0
/// 2. Immediately selfdestructs to beneficiary (during init, before returning runtime)
fn selfdestruct_in_constructor_init_code() -> Bytes {
// Init code that selfdestructs during deployment:
// PUSH1 0x42, PUSH1 0x00, SSTORE (store 0x42 at slot 0)
// PUSH20 <beneficiary>, SELFDESTRUCT
let mut init = Vec::new();
init.extend_from_slice(&[0x60, 0x42, 0x60, 0x00, 0x55]); // PUSH1 0x42, PUSH1 0x00, SSTORE
init.extend_from_slice(&[
0x73, // PUSH20
0xde, 0xad, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x01, // beneficiary address
]);
init.push(0xff); // SELFDESTRUCT
Bytes::from(init)
}
/// Creates init code for a simple contract that:
/// 1. Stores 0x42 at slot 0 during deployment
/// 2. On any call: selfdestructs to beneficiary
///
/// This simpler contract avoids complex branching logic.
fn selfdestruct_contract_init_code() -> Bytes {
// Runtime: just selfdestruct on any call
// PUSH20 <beneficiary>
// SELFDESTRUCT
let runtime = bytes!(
"73dead000000000000000000000000000000000001" // PUSH20 beneficiary
"ff" // SELFDESTRUCT
);
let runtime_len = runtime.len(); // 22 bytes
// Init code: SSTORE(0, 0x42), CODECOPY, RETURN
// Total init code before runtime = 17 bytes
let init_len: u8 = 17;
let mut init = Vec::new();
init.extend_from_slice(&[0x60, 0x42, 0x60, 0x00, 0x55]); // PUSH1 0x42, PUSH1 0x00, SSTORE
init.extend_from_slice(&[0x60, runtime_len as u8, 0x60, init_len, 0x60, 0x00, 0x39]); // CODECOPY
init.extend_from_slice(&[0x60, runtime_len as u8, 0x60, 0x00, 0xf3]); // RETURN
init.extend_from_slice(&runtime);
Bytes::from(init)
}
/// Tests SELFDESTRUCT behavior post-Dencun (Cancun+).
///
/// Post-Dencun (EIP-6780):
/// - SELFDESTRUCT only deletes contract if called in same tx as creation
/// - For existing contracts, SELFDESTRUCT only sends balance, code/storage persist
/// - The output state should NOT mark the account as destroyed
///
/// This test verifies:
/// 1. Contract deploys with storage
/// 2. SELFDESTRUCT in later tx does NOT delete code/storage
/// 3. Output state shows account is NOT destroyed
#[tokio::test]
async fn test_selfdestruct_post_dencun() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tree_config = TreeConfig::default().without_prewarming(true).without_state_cache(false);
let (mut nodes, _tasks, wallet) =
setup_engine::<EthereumNode>(1, cancun_spec(), false, tree_config, eth_payload_attributes)
.await?;
let mut node = nodes.pop().unwrap();
let signer = wallet.inner.clone();
let provider = ProviderBuilder::new()
.wallet(EthereumWallet::new(signer.clone()))
.connect_http(node.rpc_url());
// Deploy contract that stores 0x42 at slot 0 and selfdestructs on any call
let pending = provider
.send_transaction(deploy_tx(signer.address(), 0, selfdestruct_contract_init_code()))
.await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "Contract deployment should succeed");
let contract_address = receipt.contract_address.expect("Should have contract address");
// Consume the canonical notification for deployment block
let _ = node.canonical_stream.next().await;
// Trigger SELFDESTRUCT by calling the contract
let pending = provider.send_transaction(call_tx(signer.address(), contract_address, 1)).await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "Selfdestruct tx should succeed");
// Get the canonical notification for the selfdestruct block
let notification = node.canonical_stream.next().await.unwrap();
let chain = notification.committed();
let execution_outcome = chain.execution_outcome();
// Verify the output state: post-Dencun, account should NOT be destroyed
let account_state: Option<&BundleAccount> = execution_outcome.bundle.account(&contract_address);
assert!(
account_state.is_none() || !account_state.unwrap().was_destroyed(),
"Post-Dencun (EIP-6780): Account should NOT be destroyed when SELFDESTRUCT called on existing contract"
);
// Verify via RPC that code and storage persist
let code_after = provider.get_code_at(contract_address).await?;
assert!(!code_after.is_empty(), "Post-Dencun: Contract code should persist");
let slot0_after = provider.get_storage_at(contract_address, U256::ZERO).await?;
assert_eq!(slot0_after, U256::from(0x42), "Post-Dencun: Storage should persist");
// Send another transaction to the contract address in a new block.
// This tests cache behavior - if cache has stale data, execution would be incorrect.
// Post-Dencun: calling the contract should trigger SELFDESTRUCT again (but only transfer
// balance)
let pending = provider.send_transaction(call_tx(signer.address(), contract_address, 2)).await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "Second call to contract should succeed");
// Consume the canonical notification
let notification = node.canonical_stream.next().await.unwrap();
let chain = notification.committed();
let execution_outcome = chain.execution_outcome();
// Verify the output state still shows account NOT destroyed
let account_state: Option<&BundleAccount> = execution_outcome.bundle.account(&contract_address);
assert!(
account_state.is_none() || !account_state.unwrap().was_destroyed(),
"Post-Dencun: Account should still NOT be destroyed after second SELFDESTRUCT call"
);
// Verify code and storage still persist after the second call
let code_final = provider.get_code_at(contract_address).await?;
assert!(!code_final.is_empty(), "Post-Dencun: Contract code should still persist");
let slot0_final = provider.get_storage_at(contract_address, U256::ZERO).await?;
assert_eq!(slot0_final, U256::from(0x42), "Post-Dencun: Storage should still persist");
Ok(())
}
/// Tests SELFDESTRUCT in same transaction as creation (post-Dencun).
///
/// Post-Dencun (EIP-6780):
/// - SELFDESTRUCT during the same transaction as creation DOES delete the contract
/// - This is the exception to the rule that SELFDESTRUCT no longer deletes contracts
///
/// This test verifies:
/// 1. Contract selfdestructs during its constructor
/// 2. Contract is deleted (same-tx exception applies)
/// 3. No code or storage remains
/// 4. Since account never existed in DB before, bundle has no entry for it
#[tokio::test]
async fn test_selfdestruct_same_tx_post_dencun() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tree_config = TreeConfig::default().without_prewarming(true).without_state_cache(false);
let (mut nodes, _tasks, wallet) =
setup_engine::<EthereumNode>(1, cancun_spec(), false, tree_config, eth_payload_attributes)
.await?;
let mut node = nodes.pop().unwrap();
let signer = wallet.inner.clone();
let provider = ProviderBuilder::new()
.wallet(EthereumWallet::new(signer.clone()))
.connect_http(node.rpc_url());
// Deploy contract that selfdestructs during its constructor
let pending = provider
.send_transaction(deploy_tx(signer.address(), 0, selfdestruct_in_constructor_init_code()))
.await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "Contract deployment with selfdestruct should succeed");
// Calculate the contract address (CREATE uses sender + nonce)
let contract_address = signer.address().create(0);
// Get the canonical notification for the deployment block
let notification = node.canonical_stream.next().await.unwrap();
let chain = notification.committed();
let execution_outcome = chain.execution_outcome();
// Verify the output state: same-tx SELFDESTRUCT should destroy the account
let account_state: Option<&BundleAccount> = execution_outcome.bundle.account(&contract_address);
assert!(
account_state.is_none(),
"Post-Dencun same-tx: Account was created and selfdestructed in the same transaction, no trace in bundle state"
);
// Verify via RPC that code and storage are cleared
let code = provider.get_code_at(contract_address).await?;
assert!(code.is_empty(), "Post-Dencun same-tx: Contract code should be deleted");
let slot0 = provider.get_storage_at(contract_address, U256::ZERO).await?;
assert_eq!(slot0, U256::ZERO, "Post-Dencun same-tx: Storage should be cleared");
// Send ETH to the destroyed address in a new block to test cache behavior
let pending = provider
.send_transaction(transfer_tx(signer.address(), contract_address, 1, U256::from(1000)))
.await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "ETH transfer to destroyed address should succeed");
// Consume the canonical notification
let _ = node.canonical_stream.next().await;
// Verify code is still empty and account received ETH
let code_final = provider.get_code_at(contract_address).await?;
assert!(code_final.is_empty(), "Post-Dencun same-tx: Contract code should remain deleted");
let balance = provider.get_balance(contract_address).await?;
assert_eq!(balance, U256::from(1000), "Post-Dencun same-tx: Account should have received ETH");
Ok(())
}
/// Tests SELFDESTRUCT behavior pre-Dencun (Shanghai).
///
/// Pre-Dencun:
/// - SELFDESTRUCT deletes contract code and storage regardless of when contract was created
/// - The output state MUST mark the account as destroyed
///
/// This test verifies:
/// 1. Contract deploys with storage
/// 2. SELFDESTRUCT deletes code and storage
/// 3. Output state shows account IS destroyed
#[tokio::test]
async fn test_selfdestruct_pre_dencun() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tree_config = TreeConfig::default().without_prewarming(true).without_state_cache(false);
let (mut nodes, _tasks, wallet) = setup_engine::<EthereumNode>(
1,
shanghai_spec(),
false,
tree_config,
eth_payload_attributes_shanghai,
)
.await?;
let mut node = nodes.pop().unwrap();
let signer = wallet.inner.clone();
let provider = ProviderBuilder::new()
.wallet(EthereumWallet::new(signer.clone()))
.connect_http(node.rpc_url());
// Deploy contract that stores 0x42 at slot 0 and selfdestructs on any call
let pending = provider
.send_transaction(deploy_tx(signer.address(), 0, selfdestruct_contract_init_code()))
.await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "Contract deployment should succeed");
let contract_address = receipt.contract_address.expect("Should have contract address");
// Consume the canonical notification for deployment block
let _ = node.canonical_stream.next().await;
// Trigger SELFDESTRUCT by calling the contract
let pending = provider.send_transaction(call_tx(signer.address(), contract_address, 1)).await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "Selfdestruct tx should succeed");
// Get the canonical notification for the selfdestruct block
let notification = node.canonical_stream.next().await.unwrap();
let chain = notification.committed();
let execution_outcome = chain.execution_outcome();
// Verify the output state: pre-Dencun, account MUST be destroyed
let account_state: Option<&BundleAccount> = execution_outcome.bundle.account(&contract_address);
assert!(
account_state.is_some_and(|a: &BundleAccount| a.was_destroyed()),
"Pre-Dencun: Account MUST be marked as destroyed in output state"
);
// Verify via RPC that code and storage are cleared
let code_after = provider.get_code_at(contract_address).await?;
assert!(code_after.is_empty(), "Pre-Dencun: Contract code should be deleted");
let slot0_after = provider.get_storage_at(contract_address, U256::ZERO).await?;
assert_eq!(slot0_after, U256::ZERO, "Pre-Dencun: Storage should be cleared");
// Send ETH to the destroyed contract address in a new block.
// This tests cache behavior - the cache should correctly reflect the account was destroyed.
// Pre-Dencun: the contract no longer exists, so this is just a plain ETH transfer.
let pending = provider
.send_transaction(transfer_tx(signer.address(), contract_address, 2, U256::from(1000)))
.await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "ETH transfer to destroyed contract address should succeed");
// Consume the canonical notification
let notification = node.canonical_stream.next().await.unwrap();
let chain = notification.committed();
let execution_outcome = chain.execution_outcome();
// Verify the output state shows the account exists (received ETH) but has no code
let account_state: Option<&BundleAccount> = execution_outcome.bundle.account(&contract_address);
// After receiving ETH, the account should exist with balance but no code
assert!(
account_state.is_some(),
"Pre-Dencun: Account should exist after receiving ETH (even though contract was destroyed)"
);
// Verify code is still empty (contract was destroyed, only ETH was received)
let code_final = provider.get_code_at(contract_address).await?;
assert!(code_final.is_empty(), "Pre-Dencun: Contract code should remain deleted");
// Verify storage is still cleared
let slot0_final = provider.get_storage_at(contract_address, U256::ZERO).await?;
assert_eq!(slot0_final, U256::ZERO, "Pre-Dencun: Storage should remain cleared");
// Verify the account now has the ETH balance we sent
let balance = provider.get_balance(contract_address).await?;
assert_eq!(balance, U256::from(1000), "Pre-Dencun: Account should have received ETH");
Ok(())
}
/// Tests SELFDESTRUCT in same transaction as creation, where account previously had ETH
/// (post-Dencun).
///
/// Post-Dencun (EIP-6780):
/// - The same-tx exception applies when the CONTRACT is created in that transaction
/// - Even if the address previously had ETH (as an EOA), deploying a contract there and
/// selfdestructing in the same tx DOES delete the contract
/// - The "created in same tx" refers to contract creation, not account existence
///
/// This test verifies:
/// 1. Send ETH to the future contract address (address has balance but no code)
/// 2. Deploy contract that selfdestructs during constructor to that address
/// 3. Contract is deleted (same-tx exception applies - contract was created this tx)
/// 4. Code and storage are cleared
/// 5. Since account existed in DB before (had ETH), bundle marks it as Destroyed
#[tokio::test]
async fn test_selfdestruct_same_tx_preexisting_account_post_dencun() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tree_config = TreeConfig::default().without_prewarming(true).without_state_cache(false);
let (mut nodes, _tasks, wallet) =
setup_engine::<EthereumNode>(1, cancun_spec(), false, tree_config, eth_payload_attributes)
.await?;
let mut node = nodes.pop().unwrap();
let signer = wallet.inner.clone();
let provider = ProviderBuilder::new()
.wallet(EthereumWallet::new(signer.clone()))
.connect_http(node.rpc_url());
// Calculate where the contract will be deployed (CREATE uses sender + nonce)
// We'll use nonce 1 for deployment, so first send ETH with nonce 0
let future_contract_address = signer.address().create(1);
// Send ETH to the future contract address first (makes it a pre-existing account)
let pending = provider
.send_transaction(transfer_tx(
signer.address(),
future_contract_address,
0,
U256::from(1000),
))
.await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "ETH transfer should succeed");
// Consume the canonical notification
let _ = node.canonical_stream.next().await;
// Verify the account exists and has balance
let balance_before = provider.get_balance(future_contract_address).await?;
assert_eq!(balance_before, U256::from(1000), "Account should have ETH before deployment");
// Now deploy contract that selfdestructs during its constructor to the same address
let pending = provider
.send_transaction(deploy_tx(signer.address(), 1, selfdestruct_in_constructor_init_code()))
.await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "Contract deployment with selfdestruct should succeed");
// Verify deployment went to the expected address
assert_eq!(
receipt.contract_address,
Some(future_contract_address),
"Contract should be deployed to pre-computed address"
);
// Get the canonical notification for the deployment block
let notification = node.canonical_stream.next().await.unwrap();
let chain = notification.committed();
let execution_outcome = chain.execution_outcome();
// Verify the output state: same-tx exception DOES apply because contract was created this tx
// The account should be marked as destroyed. Since it had prior state (ETH balance),
// the bundle will contain it with status Destroyed and original_info set.
let account_state: Option<&BundleAccount> =
execution_outcome.bundle.account(&future_contract_address);
assert!(
account_state.is_some_and(|a| a.was_destroyed()),
"Post-Dencun same-tx with prior ETH: Account MUST be marked as destroyed"
);
// Verify via RPC that code and storage are cleared
let code = provider.get_code_at(future_contract_address).await?;
assert!(code.is_empty(), "Post-Dencun same-tx: Contract code should be deleted");
let slot0 = provider.get_storage_at(future_contract_address, U256::ZERO).await?;
assert_eq!(slot0, U256::ZERO, "Post-Dencun same-tx: Storage should be cleared");
// Balance should be zero (sent to beneficiary during SELFDESTRUCT)
let balance_after = provider.get_balance(future_contract_address).await?;
assert_eq!(
balance_after,
U256::ZERO,
"Post-Dencun same-tx: Balance should be zero (sent to beneficiary)"
);
// Send ETH to the destroyed address to verify cache behavior
let pending = provider
.send_transaction(transfer_tx(
signer.address(),
future_contract_address,
2,
U256::from(2000),
))
.await?;
node.advance_block().await?;
let receipt = pending.get_receipt().await?;
assert!(receipt.status(), "ETH transfer should succeed");
// Consume notification
let _ = node.canonical_stream.next().await;
// Verify the account received ETH and has no code (it's now just an EOA)
let balance_final = provider.get_balance(future_contract_address).await?;
assert_eq!(balance_final, U256::from(2000), "Account should have received ETH");
let code_final = provider.get_code_at(future_contract_address).await?;
assert!(code_final.is_empty(), "Code should remain empty after ETH transfer");
let slot0_final = provider.get_storage_at(future_contract_address, U256::ZERO).await?;
assert_eq!(slot0_final, U256::ZERO, "Storage should remain cleared");
Ok(())
}

View File

@@ -29,6 +29,19 @@ pub(crate) fn eth_payload_attributes(timestamp: u64) -> EthPayloadBuilderAttribu
EthPayloadBuilderAttributes::new(B256::ZERO, attributes)
}
/// Helper function to create pre-Cancun (Shanghai) payload attributes.
/// No `parent_beacon_block_root` field.
pub(crate) fn eth_payload_attributes_shanghai(timestamp: u64) -> EthPayloadBuilderAttributes {
let attributes = PayloadAttributes {
timestamp,
prev_randao: B256::ZERO,
suggested_fee_recipient: Address::ZERO,
withdrawals: Some(vec![]),
parent_beacon_block_root: None,
};
EthPayloadBuilderAttributes::new(B256::ZERO, attributes)
}
/// Advances node by producing blocks with random transactions.
pub(crate) async fn advance_with_random_transactions<Provider>(
node: &mut NodeHelperType<EthereumNode, Provider>,

View File

@@ -75,9 +75,11 @@ pub trait Executor<DB: Database>: Sized {
where
I: IntoIterator<Item = &'a RecoveredBlock<<Self::Primitives as NodePrimitives>::Block>>,
{
let mut results = Vec::new();
let blocks_iter = blocks.into_iter();
let capacity = blocks_iter.size_hint().0;
let mut results = Vec::with_capacity(capacity);
let mut first_block = None;
for block in blocks {
for block in blocks_iter {
if first_block.is_none() {
first_block = Some(block.header().number());
}

View File

@@ -35,7 +35,7 @@ use reth_execution_errors::BlockExecutionError;
use reth_primitives_traits::{
BlockTy, HeaderTy, NodePrimitives, ReceiptTy, SealedBlock, SealedHeader, TxTy,
};
use revm::{context::TxEnv, database::State};
use revm::{context::TxEnv, database::State, primitives::hardfork::SpecId};
pub mod either;
/// EVM environment configuration.
@@ -203,6 +203,7 @@ pub trait ConfigureEvm: Clone + Debug + Send + Sync + Unpin {
+ FromRecoveredTx<TxTy<Self::Primitives>>
+ FromTxWithEncoded<TxTy<Self::Primitives>>,
Precompiles = PrecompilesMap,
Spec: Into<SpecId>,
>,
>;

View File

@@ -66,13 +66,17 @@ use tokio::sync::mpsc::{Sender, UnboundedReceiver};
#[non_exhaustive]
pub struct TestPoolBuilder;
impl<Node> PoolBuilder<Node> for TestPoolBuilder
impl<Node, Evm: Send> PoolBuilder<Node, Evm> for TestPoolBuilder
where
Node: FullNodeTypes<Types: NodeTypes<Primitives: NodePrimitives<SignedTx = TransactionSigned>>>,
{
type Pool = TestPool;
async fn build_pool(self, _ctx: &BuilderContext<Node>) -> eyre::Result<Self::Pool> {
async fn build_pool(
self,
_ctx: &BuilderContext<Node>,
_evm_config: Evm,
) -> eyre::Result<Self::Pool> {
Ok(testing_pool())
}
}

View File

@@ -1631,7 +1631,7 @@ impl Discv4Service {
.filter(|entry| entry.node.value.is_expired())
.map(|n| n.node.value)
.collect::<Vec<_>>();
nodes.sort_by(|a, b| a.last_seen.cmp(&b.last_seen));
nodes.sort_by_key(|a| a.last_seen);
let to_ping = nodes.into_iter().map(|n| n.record).take(MAX_NODES_PING).collect::<Vec<_>>();
for node in to_ping {
self.try_ping(node, PingReason::RePing)

View File

@@ -14,6 +14,7 @@ workspace = true
[dependencies]
# reth
reth-chainspec.workspace = true
reth-evm-ethereum = { workspace = true, optional = true }
reth-fs-util.workspace = true
reth-primitives-traits.workspace = true
reth-net-banlist.workspace = true
@@ -136,6 +137,8 @@ test-utils = [
"reth-primitives-traits/test-utils",
"reth-provider/test-utils",
"reth-ethereum-primitives/test-utils",
"dep:reth-evm-ethereum",
"reth-evm-ethereum?/test-utils",
]
[[bench]]

View File

@@ -19,6 +19,7 @@ use reth_eth_wire::{
protocol::Protocol, DisconnectReason, EthNetworkPrimitives, HelloMessageWithProtocols,
};
use reth_ethereum_primitives::{PooledTransactionVariant, TransactionSigned};
use reth_evm_ethereum::EthEvmConfig;
use reth_network_api::{
events::{PeerEvent, SessionInfo},
test_utils::{PeersHandle, PeersHandleProvider},
@@ -182,17 +183,20 @@ where
C: ChainSpecProvider<ChainSpec: EthereumHardforks>
+ StateProviderFactory
+ BlockReaderIdExt
+ HeaderProvider
+ HeaderProvider<Header = alloy_consensus::Header>
+ Clone
+ 'static,
Pool: TransactionPool,
{
/// Installs an eth pool on each peer
pub fn with_eth_pool(self) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore>> {
pub fn with_eth_pool(
self,
) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore, EthEvmConfig>> {
self.map_pool(|peer| {
let blob_store = InMemoryBlobStore::default();
let pool = TransactionValidationTaskExecutor::eth(
peer.client.clone(),
EthEvmConfig::mainnet(),
blob_store.clone(),
TokioTaskExecutor::default(),
);
@@ -208,7 +212,7 @@ where
pub fn with_eth_pool_config(
self,
tx_manager_config: TransactionsManagerConfig,
) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore>> {
) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore, EthEvmConfig>> {
self.with_eth_pool_config_and_policy(tx_manager_config, Default::default())
}
@@ -217,11 +221,12 @@ where
self,
tx_manager_config: TransactionsManagerConfig,
policy: TransactionPropagationKind,
) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore>> {
) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore, EthEvmConfig>> {
self.map_pool(|peer| {
let blob_store = InMemoryBlobStore::default();
let pool = TransactionValidationTaskExecutor::eth(
peer.client.clone(),
EthEvmConfig::mainnet(),
blob_store.clone(),
TokioTaskExecutor::default(),
);

View File

@@ -188,13 +188,7 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
let TxFetchMetadata { fallback_peers, .. } =
self.hashes_fetch_inflight_and_pending_fetch.peek(&hash)?;
for peer_id in fallback_peers.iter() {
if self.is_idle(peer_id) {
return Some(peer_id)
}
}
None
fallback_peers.iter().find(|peer_id| self.is_idle(peer_id))
}
/// Returns any idle peer for any hash pending fetch. If one is found, the corresponding

View File

@@ -20,6 +20,7 @@ use reth_network_p2p::{
};
use reth_network_peers::{mainnet_nodes, NodeRecord, TrustedPeer};
use reth_network_types::peers::config::PeerBackoffDurations;
use reth_provider::test_utils::MockEthProvider;
use reth_storage_api::noop::NoopProvider;
use reth_tracing::init_test_tracing;
use reth_transaction_pool::test_utils::testing_pool;
@@ -655,7 +656,8 @@ async fn new_random_peer(
async fn test_connect_many() {
reth_tracing::init_test_tracing();
let net = Testnet::create_with(5, NoopProvider::default()).await;
let provider = MockEthProvider::default().with_genesis_block();
let net = Testnet::create_with(5, provider).await;
// install request handlers
let net = net.with_eth_pool();

View File

@@ -22,7 +22,7 @@ use tokio::join;
async fn test_tx_gossip() {
reth_tracing::init_test_tracing();
let provider = MockEthProvider::default();
let provider = MockEthProvider::default().with_genesis_block();
let net = Testnet::create_with(2, provider.clone()).await;
// install request handlers
@@ -61,7 +61,7 @@ async fn test_tx_gossip() {
async fn test_tx_propagation_policy_trusted_only() {
reth_tracing::init_test_tracing();
let provider = MockEthProvider::default();
let provider = MockEthProvider::default().with_genesis_block();
let policy = TransactionPropagationKind::Trusted;
let net = Testnet::create_with(2, provider.clone()).await;
@@ -129,7 +129,7 @@ async fn test_tx_propagation_policy_trusted_only() {
async fn test_tx_ingress_policy_trusted_only() {
reth_tracing::init_test_tracing();
let provider = MockEthProvider::default();
let provider = MockEthProvider::default().with_genesis_block();
let tx_manager_config = TransactionsManagerConfig {
ingress_policy: TransactionIngressPolicy::Trusted,
@@ -195,7 +195,7 @@ async fn test_tx_ingress_policy_trusted_only() {
#[tokio::test(flavor = "multi_thread")]
async fn test_4844_tx_gossip_penalization() {
reth_tracing::init_test_tracing();
let provider = MockEthProvider::default();
let provider = MockEthProvider::default().with_genesis_block();
let net = Testnet::create_with(2, provider.clone()).await;
// install request handlers
@@ -246,7 +246,7 @@ async fn test_4844_tx_gossip_penalization() {
#[tokio::test(flavor = "multi_thread")]
async fn test_sending_invalid_transactions() {
reth_tracing::init_test_tracing();
let provider = MockEthProvider::default();
let provider = MockEthProvider::default().with_genesis_block();
let net = Testnet::create_with(2, provider.clone()).await;
// install request handlers
let net = net.with_eth_pool();

View File

@@ -571,8 +571,8 @@ where
debug!(target: "downloaders", %err, ?this.start_hash, "Body range download failed");
}
}
if this.bodies.is_empty() {
// received bad response, re-request headers
if this.request.bodies.is_none() && !this.is_bodies_complete() {
// no pending bodies request (e.g., request error), retry remaining bodies
// TODO: convert this into two futures, one which is a headers range
// future, and one which is a bodies range future.
//
@@ -751,8 +751,12 @@ mod tests {
use reth_ethereum_primitives::BlockBody;
use super::*;
use crate::test_utils::TestFullBlockClient;
use std::ops::Range;
use crate::{error::RequestError, test_utils::TestFullBlockClient};
use std::{
ops::Range,
sync::atomic::{AtomicUsize, Ordering},
};
use tokio::time::{timeout, Duration};
#[tokio::test]
async fn download_single_full_block() {
@@ -800,6 +804,65 @@ mod tests {
(sealed_header, body)
}
#[derive(Clone, Debug)]
struct FailingBodiesClient {
inner: TestFullBlockClient,
fail_on: usize,
body_requests: Arc<AtomicUsize>,
}
impl FailingBodiesClient {
fn new(inner: TestFullBlockClient, fail_on: usize) -> Self {
Self { inner, fail_on, body_requests: Arc::new(AtomicUsize::new(0)) }
}
}
impl DownloadClient for FailingBodiesClient {
fn report_bad_message(&self, peer_id: PeerId) {
self.inner.report_bad_message(peer_id);
}
fn num_connected_peers(&self) -> usize {
self.inner.num_connected_peers()
}
}
impl HeadersClient for FailingBodiesClient {
type Header = <TestFullBlockClient as HeadersClient>::Header;
type Output = <TestFullBlockClient as HeadersClient>::Output;
fn get_headers_with_priority(
&self,
request: HeadersRequest,
priority: Priority,
) -> Self::Output {
self.inner.get_headers_with_priority(request, priority)
}
}
impl BodiesClient for FailingBodiesClient {
type Body = <TestFullBlockClient as BodiesClient>::Body;
type Output = <TestFullBlockClient as BodiesClient>::Output;
fn get_block_bodies_with_priority_and_range_hint(
&self,
hashes: Vec<B256>,
priority: Priority,
range_hint: Option<RangeInclusive<u64>>,
) -> Self::Output {
let attempt = self.body_requests.fetch_add(1, Ordering::SeqCst);
if attempt == self.fail_on {
return futures::future::ready(Err(RequestError::Timeout))
}
self.inner.get_block_bodies_with_priority_and_range_hint(hashes, priority, range_hint)
}
}
impl BlockClient for FailingBodiesClient {
type Block = reth_ethereum_primitives::Block;
}
#[tokio::test]
async fn download_full_block_range() {
let client = TestFullBlockClient::default();
@@ -837,6 +900,25 @@ mod tests {
}
}
#[tokio::test]
async fn download_full_block_range_retries_after_body_error() {
let mut client = TestFullBlockClient::default();
client.set_soft_limit(2);
let (header, _) = insert_headers_into_client(&client, 0..3);
let client = FailingBodiesClient::new(client, 1);
let body_requests = Arc::clone(&client.body_requests);
let client = FullBlockClient::test_client(client);
let received =
timeout(Duration::from_secs(1), client.get_full_block_range(header.hash(), 3))
.await
.expect("body request retry should complete");
assert_eq!(received.len(), 3);
assert_eq!(body_requests.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn download_full_block_range_with_invalid_header() {
let client = TestFullBlockClient::default();

View File

@@ -62,12 +62,12 @@ impl<Node, PoolB, PayloadB, NetworkB, ExecB, ConsB>
pool_builder,
payload_builder,
network_builder,
executor_builder: evm_builder,
executor_builder,
consensus_builder,
_marker,
} = self;
ComponentsBuilder {
executor_builder: evm_builder,
executor_builder,
pool_builder,
payload_builder,
network_builder,
@@ -149,15 +149,12 @@ where
pub fn pool<PB>(
self,
pool_builder: PB,
) -> ComponentsBuilder<Node, PB, PayloadB, NetworkB, ExecB, ConsB>
where
PB: PoolBuilder<Node>,
{
) -> ComponentsBuilder<Node, PB, PayloadB, NetworkB, ExecB, ConsB> {
let Self {
pool_builder: _,
payload_builder,
network_builder,
executor_builder: evm_builder,
executor_builder,
consensus_builder,
_marker,
} = self;
@@ -165,7 +162,7 @@ where
pool_builder,
payload_builder,
network_builder,
executor_builder: evm_builder,
executor_builder,
consensus_builder,
_marker,
}
@@ -185,72 +182,6 @@ where
_marker: self._marker,
}
}
}
impl<Node, PoolB, PayloadB, NetworkB, ExecB, ConsB>
ComponentsBuilder<Node, PoolB, PayloadB, NetworkB, ExecB, ConsB>
where
Node: FullNodeTypes,
PoolB: PoolBuilder<Node>,
{
/// Configures the network builder.
///
/// This accepts a [`NetworkBuilder`] instance that will be used to create the node's network
/// stack.
pub fn network<NB>(
self,
network_builder: NB,
) -> ComponentsBuilder<Node, PoolB, PayloadB, NB, ExecB, ConsB>
where
NB: NetworkBuilder<Node, PoolB::Pool>,
{
let Self {
pool_builder,
payload_builder,
network_builder: _,
executor_builder: evm_builder,
consensus_builder,
_marker,
} = self;
ComponentsBuilder {
pool_builder,
payload_builder,
network_builder,
executor_builder: evm_builder,
consensus_builder,
_marker,
}
}
/// Configures the payload builder.
///
/// This accepts a [`PayloadServiceBuilder`] instance that will be used to create the node's
/// payload builder service.
pub fn payload<PB>(
self,
payload_builder: PB,
) -> ComponentsBuilder<Node, PoolB, PB, NetworkB, ExecB, ConsB>
where
ExecB: ExecutorBuilder<Node>,
PB: PayloadServiceBuilder<Node, PoolB::Pool, ExecB::EVM>,
{
let Self {
pool_builder,
payload_builder: _,
network_builder,
executor_builder: evm_builder,
consensus_builder,
_marker,
} = self;
ComponentsBuilder {
pool_builder,
payload_builder,
network_builder,
executor_builder: evm_builder,
consensus_builder,
_marker,
}
}
/// Configures the executor builder.
///
@@ -298,7 +229,72 @@ where
network_builder,
executor_builder,
consensus_builder: _,
_marker,
} = self;
ComponentsBuilder {
pool_builder,
payload_builder,
network_builder,
executor_builder,
consensus_builder,
_marker,
}
}
}
impl<Node, PoolB, PayloadB, NetworkB, ExecB, ConsB>
ComponentsBuilder<Node, PoolB, PayloadB, NetworkB, ExecB, ConsB>
where
Node: FullNodeTypes,
ExecB: ExecutorBuilder<Node>,
PoolB: PoolBuilder<Node, ExecB::EVM>,
{
/// Configures the network builder.
///
/// This accepts a [`NetworkBuilder`] instance that will be used to create the node's network
/// stack.
pub fn network<NB>(
self,
network_builder: NB,
) -> ComponentsBuilder<Node, PoolB, PayloadB, NB, ExecB, ConsB>
where
NB: NetworkBuilder<Node, PoolB::Pool>,
{
let Self {
pool_builder,
payload_builder,
network_builder: _,
executor_builder,
consensus_builder,
_marker,
} = self;
ComponentsBuilder {
pool_builder,
payload_builder,
network_builder,
executor_builder,
consensus_builder,
_marker,
}
}
/// Configures the payload builder.
///
/// This accepts a [`PayloadServiceBuilder`] instance that will be used to create the node's
/// payload builder service.
pub fn payload<PB>(
self,
payload_builder: PB,
) -> ComponentsBuilder<Node, PoolB, PB, NetworkB, ExecB, ConsB>
where
PB: PayloadServiceBuilder<Node, PoolB::Pool, ExecB::EVM>,
{
let Self {
pool_builder,
payload_builder: _,
network_builder,
executor_builder,
consensus_builder,
_marker,
} = self;
ComponentsBuilder {
@@ -358,7 +354,7 @@ impl<Node, PoolB, PayloadB, NetworkB, ExecB, ConsB> NodeComponentsBuilder<Node>
for ComponentsBuilder<Node, PoolB, PayloadB, NetworkB, ExecB, ConsB>
where
Node: FullNodeTypes,
PoolB: PoolBuilder<Node, Pool: TransactionPool>,
PoolB: PoolBuilder<Node, ExecB::EVM, Pool: TransactionPool>,
NetworkB: NetworkBuilder<
Node,
PoolB::Pool,
@@ -384,13 +380,13 @@ where
pool_builder,
payload_builder,
network_builder,
executor_builder: evm_builder,
executor_builder,
consensus_builder,
_marker,
} = self;
let evm_config = evm_builder.build_evm(context).await?;
let pool = pool_builder.build_pool(context).await?;
let evm_config = executor_builder.build_evm(context).await?;
let pool = pool_builder.build_pool(context, evm_config.clone()).await?;
let network = network_builder.build_network(context, pool.clone()).await?;
let payload_builder_handle = payload_builder
.spawn_payload_builder_service(context, pool.clone(), evm_config.clone())
@@ -471,14 +467,19 @@ where
#[derive(Debug, Clone)]
pub struct NoopTransactionPoolBuilder<Tx = EthPooledTransaction>(PhantomData<Tx>);
impl<N, Tx> PoolBuilder<N> for NoopTransactionPoolBuilder<Tx>
impl<N, Tx, Evm> PoolBuilder<N, Evm> for NoopTransactionPoolBuilder<Tx>
where
N: FullNodeTypes,
Tx: EthPoolTransaction<Consensus = TxTy<N::Types>> + Unpin,
Evm: Send,
{
type Pool = NoopTransactionPool<Tx>;
async fn build_pool(self, _ctx: &BuilderContext<N>) -> eyre::Result<Self::Pool> {
async fn build_pool(
self,
_ctx: &BuilderContext<N>,
_evm_config: Evm,
) -> eyre::Result<Self::Pool> {
Ok(NoopTransactionPool::<Tx>::new())
}
}

View File

@@ -12,7 +12,7 @@ use reth_transaction_pool::{
use std::{collections::HashSet, future::Future};
/// A type that knows how to build the transaction pool.
pub trait PoolBuilder<Node: FullNodeTypes>: Send {
pub trait PoolBuilder<Node: FullNodeTypes, Evm>: Send {
/// The transaction pool to build.
type Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>>
+ Unpin
@@ -22,16 +22,17 @@ pub trait PoolBuilder<Node: FullNodeTypes>: Send {
fn build_pool(
self,
ctx: &BuilderContext<Node>,
evm_config: Evm,
) -> impl Future<Output = eyre::Result<Self::Pool>> + Send;
}
impl<Node, F, Fut, Pool> PoolBuilder<Node> for F
impl<Node, F, Fut, Pool, Evm> PoolBuilder<Node, Evm> for F
where
Node: FullNodeTypes,
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>>
+ Unpin
+ 'static,
F: FnOnce(&BuilderContext<Node>) -> Fut + Send,
F: FnOnce(&BuilderContext<Node>, Evm) -> Fut + Send,
Fut: Future<Output = eyre::Result<Pool>> + Send,
{
type Pool = Pool;
@@ -39,8 +40,9 @@ where
fn build_pool(
self,
ctx: &BuilderContext<Node>,
evm_config: Evm,
) -> impl Future<Output = eyre::Result<Self::Pool>> {
self(ctx)
self(ctx, evm_config)
}
}

View File

@@ -22,9 +22,8 @@ pub struct DefaultEngineValues {
legacy_state_root_task_enabled: bool,
state_cache_disabled: bool,
prewarming_disabled: bool,
parallel_sparse_trie_disabled: bool,
state_provider_metrics: bool,
cross_block_cache_size: u64,
cross_block_cache_size: usize,
state_root_task_compare_updates: bool,
accept_execution_requests_hash: bool,
multiproof_chunking_enabled: bool,
@@ -81,12 +80,6 @@ impl DefaultEngineValues {
self
}
/// Set whether to disable parallel sparse trie by default
pub const fn with_parallel_sparse_trie_disabled(mut self, v: bool) -> Self {
self.parallel_sparse_trie_disabled = v;
self
}
/// Set whether to enable state provider metrics by default
pub const fn with_state_provider_metrics(mut self, v: bool) -> Self {
self.state_provider_metrics = v;
@@ -94,7 +87,7 @@ impl DefaultEngineValues {
}
/// Set the default cross-block cache size in MB
pub const fn with_cross_block_cache_size(mut self, v: u64) -> Self {
pub const fn with_cross_block_cache_size(mut self, v: usize) -> Self {
self.cross_block_cache_size = v;
self
}
@@ -189,7 +182,6 @@ impl Default for DefaultEngineValues {
legacy_state_root_task_enabled: false,
state_cache_disabled: false,
prewarming_disabled: false,
parallel_sparse_trie_disabled: false,
state_provider_metrics: false,
cross_block_cache_size: DEFAULT_CROSS_BLOCK_CACHE_SIZE_MB,
state_root_task_compare_updates: false,
@@ -244,14 +236,14 @@ pub struct EngineArgs {
#[arg(long = "engine.disable-prewarming", alias = "engine.disable-caching-and-prewarming", default_value_t = DefaultEngineValues::get_global().prewarming_disabled)]
pub prewarming_disabled: bool,
/// CAUTION: This CLI flag has no effect anymore, use --engine.disable-parallel-sparse-trie
/// if you want to disable usage of the `ParallelSparseTrie`.
/// CAUTION: This CLI flag has no effect anymore. The parallel sparse trie is always enabled.
#[deprecated]
#[arg(long = "engine.parallel-sparse-trie", default_value = "true", hide = true)]
pub parallel_sparse_trie_enabled: bool,
/// Disable the parallel sparse trie in the engine.
#[arg(long = "engine.disable-parallel-sparse-trie", default_value_t = DefaultEngineValues::get_global().parallel_sparse_trie_disabled)]
/// CAUTION: This CLI flag has no effect anymore. The parallel sparse trie is always enabled.
#[deprecated]
#[arg(long = "engine.disable-parallel-sparse-trie", default_value = "false", hide = true)]
pub parallel_sparse_trie_disabled: bool,
/// Enable state provider latency metrics. This allows the engine to collect and report stats
@@ -262,7 +254,7 @@ pub struct EngineArgs {
/// Configure the size of cross-block cache in megabytes
#[arg(long = "engine.cross-block-cache-size", default_value_t = DefaultEngineValues::get_global().cross_block_cache_size)]
pub cross_block_cache_size: u64,
pub cross_block_cache_size: usize,
/// Enable comparing trie updates from the state root task to the trie updates from the regular
/// state root calculation.
@@ -343,7 +335,6 @@ impl Default for EngineArgs {
legacy_state_root_task_enabled,
state_cache_disabled,
prewarming_disabled,
parallel_sparse_trie_disabled,
state_provider_metrics,
cross_block_cache_size,
state_root_task_compare_updates,
@@ -369,7 +360,7 @@ impl Default for EngineArgs {
state_cache_disabled,
prewarming_disabled,
parallel_sparse_trie_enabled: true,
parallel_sparse_trie_disabled,
parallel_sparse_trie_disabled: false,
state_provider_metrics,
cross_block_cache_size,
accept_execution_requests_hash,
@@ -398,7 +389,6 @@ impl EngineArgs {
.with_legacy_state_root(self.legacy_state_root_task_enabled)
.without_state_cache(self.state_cache_disabled)
.without_prewarming(self.prewarming_disabled)
.with_disable_parallel_sparse_trie(self.parallel_sparse_trie_disabled)
.with_state_provider_metrics(self.state_provider_metrics)
.with_always_compare_trie_updates(self.state_root_task_compare_updates)
.with_cross_block_cache_size(self.cross_block_cache_size * 1024 * 1024)
@@ -457,7 +447,7 @@ mod tests {
state_cache_disabled: true,
prewarming_disabled: true,
parallel_sparse_trie_enabled: true,
parallel_sparse_trie_disabled: true,
parallel_sparse_trie_disabled: false,
state_provider_metrics: true,
cross_block_cache_size: 256,
state_root_task_compare_updates: true,
@@ -485,7 +475,6 @@ mod tests {
"--engine.legacy-state-root",
"--engine.disable-state-cache",
"--engine.disable-prewarming",
"--engine.disable-parallel-sparse-trie",
"--engine.state-provider-metrics",
"--engine.cross-block-cache-size",
"256",

View File

@@ -1025,6 +1025,7 @@ mod tests {
max_receipts: 2000,
max_headers: 1000,
max_concurrent_db_requests: 512,
max_cached_tx_hashes: 30_000,
},
gas_price_oracle: GasPriceOracleArgs {
blocks: 20,

View File

@@ -1,7 +1,7 @@
use clap::Args;
use reth_rpc_server_types::constants::cache::{
DEFAULT_BLOCK_CACHE_MAX_LEN, DEFAULT_CONCURRENT_DB_REQUESTS, DEFAULT_HEADER_CACHE_MAX_LEN,
DEFAULT_RECEIPT_CACHE_MAX_LEN,
DEFAULT_MAX_CACHED_TX_HASHES, DEFAULT_RECEIPT_CACHE_MAX_LEN,
};
/// Parameters to configure RPC state cache.
@@ -36,6 +36,13 @@ pub struct RpcStateCacheArgs {
default_value_t = DEFAULT_CONCURRENT_DB_REQUESTS,
)]
pub max_concurrent_db_requests: usize,
/// Maximum number of transaction hashes to cache for transaction lookups.
#[arg(
long = "rpc-cache.max-cached-tx-hashes",
default_value_t = DEFAULT_MAX_CACHED_TX_HASHES,
)]
pub max_cached_tx_hashes: u32,
}
impl RpcStateCacheArgs {
@@ -54,6 +61,7 @@ impl Default for RpcStateCacheArgs {
max_receipts: DEFAULT_RECEIPT_CACHE_MAX_LEN,
max_headers: DEFAULT_HEADER_CACHE_MAX_LEN,
max_concurrent_db_requests: DEFAULT_CONCURRENT_DB_REQUESTS,
max_cached_tx_hashes: DEFAULT_MAX_CACHED_TX_HASHES,
}
}
}

View File

@@ -39,7 +39,7 @@ pub use reth_engine_primitives::{
};
/// Default size of cross-block cache in megabytes.
pub const DEFAULT_CROSS_BLOCK_CACHE_SIZE_MB: u64 = 4 * 1024;
pub const DEFAULT_CROSS_BLOCK_CACHE_SIZE_MB: usize = 4 * 1024;
/// This includes all necessary configuration to launch the node.
/// The individual configuration options can be overwritten before launching the node.

View File

@@ -257,6 +257,11 @@ fn describe_rocksdb_metrics() {
Unit::Bytes,
"The size of memtables for a RocksDB table"
);
describe_gauge!(
"rocksdb.wal_size",
Unit::Bytes,
"The total size of WAL (Write-Ahead Log) files. Important: this is not included in table_size or sst_size metrics"
);
}
#[cfg(all(feature = "jemalloc", unix))]

View File

@@ -16,7 +16,7 @@ use reth_network::{
PeersInfo,
};
use reth_node_api::{
AddOnsContext, BlockTy, BuildNextEnv, EngineTypes, FullNodeComponents, HeaderTy, NodeAddOns,
AddOnsContext, BuildNextEnv, EngineTypes, FullNodeComponents, HeaderTy, NodeAddOns,
NodePrimitives, PayloadAttributesBuilder, PayloadTypes, PrimitivesTy, TxTy,
};
use reth_node_builder::{
@@ -165,6 +165,7 @@ impl OpNode {
self.args;
ComponentsBuilder::default()
.node_types::<Node>()
.executor(OpExecutorBuilder::default())
.pool(
OpPoolBuilder::default()
.with_enable_tx_conditional(self.args.enable_tx_conditional)
@@ -173,7 +174,6 @@ impl OpNode {
self.args.supervisor_safety_level,
),
)
.executor(OpExecutorBuilder::default())
.payload(BasicPayloadServiceBuilder::new(
OpPayloadBuilder::new(compute_pending_block)
.with_da_config(self.da_config.clone())
@@ -957,14 +957,19 @@ impl<T> OpPoolBuilder<T> {
}
}
impl<Node, T> PoolBuilder<Node> for OpPoolBuilder<T>
impl<Node, T, Evm> PoolBuilder<Node, Evm> for OpPoolBuilder<T>
where
Node: FullNodeTypes<Types: NodeTypes<ChainSpec: OpHardforks>>,
T: EthPoolTransaction<Consensus = TxTy<Node::Types>> + OpPooledTx,
Evm: ConfigureEvm<Primitives = PrimitivesTy<Node::Types>> + Clone + 'static,
{
type Pool = OpTransactionPool<Node::Provider, DiskFileBlobStore, T, BlockTy<Node::Types>>;
type Pool = OpTransactionPool<Node::Provider, DiskFileBlobStore, Evm, T>;
async fn build_pool(self, ctx: &BuilderContext<Node>) -> eyre::Result<Self::Pool> {
async fn build_pool(
self,
ctx: &BuilderContext<Node>,
evm_config: Evm,
) -> eyre::Result<Self::Pool> {
let Self { pool_config_overrides, .. } = self;
// supervisor used for interop
@@ -982,27 +987,27 @@ where
.await;
let blob_store = reth_node_builder::components::create_blob_store(ctx)?;
let validator = TransactionValidationTaskExecutor::eth_builder(ctx.provider().clone())
.no_eip4844()
.with_head_timestamp(ctx.head().timestamp)
.with_max_tx_input_bytes(ctx.config().txpool.max_tx_input_bytes)
.kzg_settings(ctx.kzg_settings()?)
.set_tx_fee_cap(ctx.config().rpc.rpc_tx_fee_cap)
.with_max_tx_gas_limit(ctx.config().txpool.max_tx_gas_limit)
.with_minimum_priority_fee(ctx.config().txpool.minimum_priority_fee)
.with_additional_tasks(
pool_config_overrides
.additional_validation_tasks
.unwrap_or_else(|| ctx.config().txpool.additional_validation_tasks),
)
.build_with_tasks(ctx.task_executor().clone(), blob_store.clone())
.map(|validator| {
OpTransactionValidator::new(validator)
// In --dev mode we can't require gas fees because we're unable to decode
// the L1 block info
.require_l1_data_gas_fee(!ctx.config().dev.dev)
.with_supervisor(supervisor_client.clone())
});
let validator =
TransactionValidationTaskExecutor::eth_builder(ctx.provider().clone(), evm_config)
.no_eip4844()
.with_max_tx_input_bytes(ctx.config().txpool.max_tx_input_bytes)
.kzg_settings(ctx.kzg_settings()?)
.set_tx_fee_cap(ctx.config().rpc.rpc_tx_fee_cap)
.with_max_tx_gas_limit(ctx.config().txpool.max_tx_gas_limit)
.with_minimum_priority_fee(ctx.config().txpool.minimum_priority_fee)
.with_additional_tasks(
pool_config_overrides
.additional_validation_tasks
.unwrap_or_else(|| ctx.config().txpool.additional_validation_tasks),
)
.build_with_tasks(ctx.task_executor().clone(), blob_store.clone())
.map(|validator| {
OpTransactionValidator::new(validator)
// In --dev mode we can't require gas fees because we're unable to decode
// the L1 block info
.require_l1_data_gas_fee(!ctx.config().dev.dev)
.with_supervisor(supervisor_client.clone())
});
let final_pool_config = pool_config_overrides.apply(ctx.pool_config());

View File

@@ -52,9 +52,9 @@
//! ComponentsBuilder::default()
//! .node_types::<RethFullAdapter<_, OpNode>>()
//! .noop_pool::<OpPooledTransaction>()
//! .noop_network::<OpNetworkPrimitives>()
//! .noop_consensus()
//! .executor(OpExecutorBuilder::default())
//! .noop_consensus()
//! .noop_network::<OpNetworkPrimitives>()
//! .noop_payload(),
//! Box::new(()) as Box<dyn OnComponentInitializedHook<_>>,
//! )

View File

@@ -155,6 +155,10 @@ impl IsTyped2718 for OpTransactionSigned {
}
impl SignedTransaction for OpTransactionSigned {
fn is_system_tx(&self) -> bool {
self.is_deposit()
}
fn recalculate_hash(&self) -> B256 {
keccak256(self.encoded_2718())
}

View File

@@ -23,6 +23,7 @@ alloy-serde.workspace = true
# reth
reth-chainspec.workspace = true
reth-evm.workspace = true
reth-primitives-traits.workspace = true
reth-chain-state.workspace = true
reth-storage-api.workspace = true

View File

@@ -9,7 +9,6 @@
#![cfg_attr(docsrs, feature(doc_cfg))]
mod validator;
use op_alloy_consensus::OpBlock;
pub use validator::{OpL1BlockInfo, OpTransactionValidator};
pub mod conditional;
@@ -25,8 +24,8 @@ pub mod estimated_da_size;
use reth_transaction_pool::{CoinbaseTipOrdering, Pool, TransactionValidationTaskExecutor};
/// Type alias for default optimism transaction pool
pub type OpTransactionPool<Client, S, T = OpPooledTransaction, B = OpBlock> = Pool<
TransactionValidationTaskExecutor<OpTransactionValidator<Client, T, B>>,
pub type OpTransactionPool<Client, S, Evm, T = OpPooledTransaction> = Pool<
TransactionValidationTaskExecutor<OpTransactionValidator<Client, T, Evm>>,
CoinbaseTipOrdering<T>,
S,
>;

View File

@@ -316,7 +316,8 @@ mod tests {
use alloy_primitives::{TxKind, U256};
use op_alloy_consensus::TxDeposit;
use reth_optimism_chainspec::OP_MAINNET;
use reth_optimism_primitives::OpTransactionSigned;
use reth_optimism_evm::OpEvmConfig;
use reth_optimism_primitives::{OpPrimitives, OpTransactionSigned};
use reth_provider::test_utils::MockEthProvider;
use reth_transaction_pool::{
blobstore::InMemoryBlobStore, validate::EthTransactionValidatorBuilder, TransactionOrigin,
@@ -324,12 +325,14 @@ mod tests {
};
#[tokio::test]
async fn validate_optimism_transaction() {
let client = MockEthProvider::default().with_chain_spec(OP_MAINNET.clone());
let validator =
EthTransactionValidatorBuilder::new(client)
.no_shanghai()
.no_cancun()
.build::<_, _, reth_optimism_primitives::OpBlock>(InMemoryBlobStore::default());
let client = MockEthProvider::<OpPrimitives>::new()
.with_chain_spec(OP_MAINNET.clone())
.with_genesis_block();
let evm_config = OpEvmConfig::optimism(OP_MAINNET.clone());
let validator = EthTransactionValidatorBuilder::new(client, evm_config)
.no_shanghai()
.no_cancun()
.build(InMemoryBlobStore::default());
let validator = OpTransactionValidator::new(validator);
let origin = TransactionOrigin::External;

View File

@@ -1,13 +1,14 @@
use crate::{supervisor::SupervisorClient, InvalidCrossTx, OpPooledTx};
use alloy_consensus::{BlockHeader, Transaction};
use op_alloy_consensus::OpBlock;
use op_revm::L1BlockInfo;
use parking_lot::RwLock;
use reth_chainspec::ChainSpecProvider;
use reth_evm::ConfigureEvm;
use reth_optimism_evm::RethL1BlockInfo;
use reth_optimism_forks::OpHardforks;
use reth_primitives_traits::{
transaction::error::InvalidTransactionError, Block, BlockBody, GotExpected, SealedBlock,
transaction::error::InvalidTransactionError, Block, BlockBody, BlockTy, GotExpected,
SealedBlock,
};
use reth_storage_api::{AccountInfoReader, BlockReaderIdExt, StateProviderFactory};
use reth_transaction_pool::{
@@ -40,9 +41,9 @@ impl OpL1BlockInfo {
/// Validator for Optimism transactions.
#[derive(Debug, Clone)]
pub struct OpTransactionValidator<Client, Tx, B = OpBlock> {
pub struct OpTransactionValidator<Client, Tx, Evm> {
/// The type that performs the actual validation.
inner: Arc<EthTransactionValidator<Client, Tx, B>>,
inner: Arc<EthTransactionValidator<Client, Tx, Evm>>,
/// Additional block info required for validation.
block_info: Arc<OpL1BlockInfo>,
/// If true, ensure that the transaction's sender has enough balance to cover the L1 gas fee
@@ -55,7 +56,7 @@ pub struct OpTransactionValidator<Client, Tx, B = OpBlock> {
fork_tracker: Arc<OpForkTracker>,
}
impl<Client, Tx, B: Block> OpTransactionValidator<Client, Tx, B> {
impl<Client, Tx, Evm> OpTransactionValidator<Client, Tx, Evm> {
/// Returns the configured chain spec
pub fn chain_spec(&self) -> Arc<Client::ChainSpec>
where
@@ -87,15 +88,15 @@ impl<Client, Tx, B: Block> OpTransactionValidator<Client, Tx, B> {
}
}
impl<Client, Tx, B> OpTransactionValidator<Client, Tx, B>
impl<Client, Tx, Evm> OpTransactionValidator<Client, Tx, Evm>
where
Client:
ChainSpecProvider<ChainSpec: OpHardforks> + StateProviderFactory + BlockReaderIdExt + Sync,
Tx: EthPoolTransaction + OpPooledTx,
B: Block,
Evm: ConfigureEvm,
{
/// Create a new [`OpTransactionValidator`].
pub fn new(inner: EthTransactionValidator<Client, Tx, B>) -> Self {
pub fn new(inner: EthTransactionValidator<Client, Tx, Evm>) -> Self {
let this = Self::with_block_info(inner, OpL1BlockInfo::default());
if let Ok(Some(block)) =
this.inner.client().block_by_number_or_tag(alloy_eips::BlockNumberOrTag::Latest)
@@ -114,7 +115,7 @@ where
/// Create a new [`OpTransactionValidator`] with the given [`OpL1BlockInfo`].
pub fn with_block_info(
inner: EthTransactionValidator<Client, Tx, B>,
inner: EthTransactionValidator<Client, Tx, Evm>,
block_info: OpL1BlockInfo,
) -> Self {
Self {
@@ -290,15 +291,15 @@ where
}
}
impl<Client, Tx, B> TransactionValidator for OpTransactionValidator<Client, Tx, B>
impl<Client, Tx, Evm> TransactionValidator for OpTransactionValidator<Client, Tx, Evm>
where
Client:
ChainSpecProvider<ChainSpec: OpHardforks> + StateProviderFactory + BlockReaderIdExt + Sync,
Tx: EthPoolTransaction + OpPooledTx,
B: Block,
Evm: ConfigureEvm,
{
type Transaction = Tx;
type Block = B;
type Block = BlockTy<Evm::Primitives>;
async fn validate_transaction(
&self,

View File

@@ -798,12 +798,14 @@ mod rpc_compat {
.zip(senders)
.enumerate()
.map(|(idx, (tx, sender))| {
#[allow(clippy::needless_update)]
let tx_info = TransactionInfo {
hash: Some(*tx.tx_hash()),
block_hash,
block_number: Some(block_number),
base_fee,
index: Some(idx as u64),
..Default::default()
};
converter(Recovered::new_unchecked(tx, sender), tx_info)

View File

@@ -48,6 +48,16 @@ pub trait SignedTransaction:
+ TxHashRef
+ IsTyped2718
{
/// Returns whether this is a system transaction.
///
/// System transactions are created at the protocol level rather than by users. They are
/// typically used by L2s for special purposes (e.g., Optimism deposit transactions with type
/// 126) and may have different validation rules or fee handling compared to standard
/// user-initiated transactions.
fn is_system_tx(&self) -> bool {
false
}
/// Returns whether this transaction type can be __broadcasted__ as full transaction over the
/// network.
///

View File

@@ -17,6 +17,7 @@ reth-exex-types.workspace = true
reth-db-api.workspace = true
reth-errors.workspace = true
reth-provider.workspace = true
reth-storage-api.workspace = true
reth-tokio-util.workspace = true
reth-config.workspace = true
reth-prune-types.workspace = true

View File

@@ -10,6 +10,7 @@ use reth_provider::{
StageCheckpointReader, StaticFileProviderFactory, StorageSettingsCache,
};
use reth_prune_types::PruneModes;
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
use std::time::Duration;
use tokio::sync::watch;
@@ -82,6 +83,8 @@ impl PrunerBuilder {
+ ChainStateBlockReader
+ StorageSettingsCache
+ StageCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader
+ StaticFileProviderFactory<
Primitives: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>,
>,
@@ -116,7 +119,9 @@ impl PrunerBuilder {
+ PruneCheckpointWriter
+ PruneCheckpointReader
+ StorageSettingsCache
+ StageCheckpointReader,
+ StageCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader,
{
let segments = SegmentSet::<Provider>::from_components(static_file_provider, self.segments);

View File

@@ -10,6 +10,7 @@ use reth_provider::{
PruneCheckpointReader, PruneCheckpointWriter, StaticFileProviderFactory, StorageSettingsCache,
};
use reth_prune_types::PruneModes;
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
/// Collection of [`Segment`]. Thread-safe, allocated on the heap.
#[derive(Debug)]
@@ -52,7 +53,9 @@ where
+ PruneCheckpointReader
+ BlockReader<Transaction: Encodable2718>
+ ChainStateBlockReader
+ StorageSettingsCache,
+ StorageSettingsCache
+ ChangeSetReader
+ StorageChangeSetReader,
{
/// Creates a [`SegmentSet`] from an existing components, such as [`StaticFileProvider`] and
/// [`PruneModes`].

View File

@@ -1,14 +1,22 @@
use crate::{
db_ext::DbTxPruneExt,
segments::{user::history::prune_history_indices, PruneInput, Segment},
segments::{
user::history::{finalize_history_prune, HistoryPruneResult},
PruneInput, Segment,
},
PrunerError,
};
use itertools::Itertools;
use alloy_primitives::BlockNumber;
use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut};
use reth_provider::DBProvider;
use reth_provider::{
changeset_walker::StaticFileAccountChangesetWalker, DBProvider, EitherWriter,
StaticFileProviderFactory, StorageSettingsCache,
};
use reth_prune_types::{
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use reth_static_file_types::StaticFileSegment;
use reth_storage_api::ChangeSetReader;
use rustc_hash::FxHashMap;
use tracing::{instrument, trace};
@@ -31,7 +39,10 @@ impl AccountHistory {
impl<Provider> Segment<Provider> for AccountHistory
where
Provider: DBProvider<Tx: DbTxMut>,
Provider: DBProvider<Tx: DbTxMut>
+ StaticFileProviderFactory
+ StorageSettingsCache
+ ChangeSetReader,
{
fn segment(&self) -> PruneSegment {
PruneSegment::AccountHistory
@@ -56,11 +67,33 @@ where
};
let range_end = *range.end();
// Check where account changesets are stored
if EitherWriter::account_changesets_destination(provider).is_static_file() {
self.prune_static_files(provider, input, range, range_end)
} else {
self.prune_database(provider, input, range, range_end)
}
}
}
impl AccountHistory {
/// Prunes account history when changesets are stored in static files.
fn prune_static_files<Provider>(
&self,
provider: &Provider,
input: PruneInput,
range: std::ops::RangeInclusive<BlockNumber>,
range_end: BlockNumber,
) -> Result<SegmentOutput, PrunerError>
where
Provider: DBProvider<Tx: DbTxMut> + StaticFileProviderFactory + ChangeSetReader,
{
let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE)
} else {
input.limiter
};
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
@@ -68,15 +101,86 @@ where
))
}
// The size of this map it's limited by `prune_delete_limit * blocks_since_last_run /
// ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with the current defaults it's usually `3500 * 5 /
// 2`, so 8750 entries. Each entry is `160 bit + 64 bit`, so the total size should be up to
// ~0.25MB + some hashmap overhead. `blocks_since_last_run` is additionally limited by the
// `max_reorg_depth`, so no OOM is expected here.
let mut highest_deleted_accounts = FxHashMap::default();
let mut last_changeset_pruned_block = None;
let mut pruned_changesets = 0;
let mut done = true;
let walker = StaticFileAccountChangesetWalker::new(provider, range);
for result in walker {
if limiter.is_limit_reached() {
done = false;
break;
}
let (block_number, changeset) = result?;
highest_deleted_accounts.insert(changeset.address, block_number);
last_changeset_pruned_block = Some(block_number);
pruned_changesets += 1;
limiter.increment_deleted_entries_count();
}
// Delete static file jars below the pruned block
if let Some(last_block) = last_changeset_pruned_block {
provider
.static_file_provider()
.delete_segment_below_block(StaticFileSegment::AccountChangeSets, last_block + 1)?;
}
trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets from static files)");
let result = HistoryPruneResult {
highest_deleted: highest_deleted_accounts,
last_pruned_block: last_changeset_pruned_block,
pruned_count: pruned_changesets,
done,
};
finalize_history_prune::<_, tables::AccountsHistory, _, _>(
provider,
result,
range_end,
&limiter,
ShardedKey::new,
|a, b| a.key == b.key,
)
.map_err(Into::into)
}
fn prune_database<Provider>(
&self,
provider: &Provider,
input: PruneInput,
range: std::ops::RangeInclusive<BlockNumber>,
range_end: BlockNumber,
) -> Result<SegmentOutput, PrunerError>
where
Provider: DBProvider<Tx: DbTxMut>,
{
let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE)
} else {
input.limiter
};
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
))
}
// Deleted account changeset keys (account addresses) with the highest block number deleted
// for that key.
//
// The size of this map it's limited by `prune_delete_limit * blocks_since_last_run /
// ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with current default it's usually `3500 * 5
// / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total
// size should be up to 0.5MB + some hashmap overhead. `blocks_since_last_run` is
// additionally limited by the `max_reorg_depth`, so no OOM is expected here.
// The size of this map is limited by `prune_delete_limit * blocks_since_last_run /
// ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with the current defaults it's usually `3500 * 5 /
// 2`, so 8750 entries. Each entry is `160 bit + 64 bit`, so the total size should be up to
// ~0.25MB + some hashmap overhead. `blocks_since_last_run` is additionally limited by the
// `max_reorg_depth`, so no OOM is expected here.
let mut last_changeset_pruned_block = None;
let mut highest_deleted_accounts = FxHashMap::default();
let (pruned_changesets, done) =
provider.tx_ref().prune_table_with_range::<tables::AccountChangeSets>(
@@ -88,69 +192,52 @@ where
last_changeset_pruned_block = Some(block_number);
},
)?;
trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets)");
trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets from database)");
let last_changeset_pruned_block = last_changeset_pruned_block
// If there's more account changesets to prune, set the checkpoint block number to
// previous, so we could finish pruning its account changesets on the next run.
.map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
.unwrap_or(range_end);
// Sort highest deleted block numbers by account address and turn them into sharded keys.
// We did not use `BTreeMap` from the beginning, because it's inefficient for hashes.
let highest_sharded_keys = highest_deleted_accounts
.into_iter()
.sorted_unstable() // Unstable is fine because no equal keys exist in the map
.map(|(address, block_number)| {
ShardedKey::new(address, block_number.min(last_changeset_pruned_block))
});
let outcomes = prune_history_indices::<Provider, tables::AccountsHistory, _>(
let result = HistoryPruneResult {
highest_deleted: highest_deleted_accounts,
last_pruned_block: last_changeset_pruned_block,
pruned_count: pruned_changesets,
done,
};
finalize_history_prune::<_, tables::AccountsHistory, _, _>(
provider,
highest_sharded_keys,
result,
range_end,
&limiter,
ShardedKey::new,
|a, b| a.key == b.key,
)?;
trace!(target: "pruner", ?outcomes, %done, "Pruned account history (indices)");
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
pruned: pruned_changesets + outcomes.deleted,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
})
)
.map_err(Into::into)
}
}
#[cfg(test)]
mod tests {
use crate::segments::{
user::account_history::ACCOUNT_HISTORY_TABLES_TO_PRUNE, AccountHistory, PruneInput,
PruneLimiter, Segment, SegmentOutput,
};
use super::ACCOUNT_HISTORY_TABLES_TO_PRUNE;
use crate::segments::{AccountHistory, PruneInput, PruneLimiter, Segment, SegmentOutput};
use alloy_primitives::{BlockNumber, B256};
use assert_matches::assert_matches;
use reth_db_api::{tables, BlockNumberList};
use reth_db_api::{models::StorageSettings, tables, BlockNumberList};
use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
use reth_prune_types::{
PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_storage_api::StorageSettingsCache;
use reth_testing_utils::generators::{
self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
};
use std::{collections::BTreeMap, ops::AddAssign};
#[test]
fn prune() {
fn prune_legacy() {
let db = TestStageDB::default();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
1..=5000,
0..=5000,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
@@ -202,6 +289,9 @@ mod tests {
let segment = AccountHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default().with_account_changesets_in_static_files(false),
);
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
@@ -239,20 +329,18 @@ mod tests {
.map(|(i, _)| i)
.unwrap_or_default();
let mut pruned_changesets = changesets
.iter()
// Skip what we've pruned so far, subtracting one to get last pruned block
// number further down
.skip(pruned.saturating_sub(1));
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
let last_pruned_block_number = pruned_changesets
.next()
.map(|(block_number, _)| if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
} as BlockNumber)
.unwrap_or(to_block);
.next()
.map(|(block_number, _)| if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
} as BlockNumber)
.unwrap_or(to_block);
let pruned_changesets = pruned_changesets.fold(
BTreeMap::<_, Vec<_>>::new(),
@@ -303,4 +391,152 @@ mod tests {
test_prune(998, 2, (PruneProgress::Finished, 998));
test_prune(1400, 3, (PruneProgress::Finished, 804));
}
#[test]
fn prune_static_file() {
let db = TestStageDB::default();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=5000,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
let (changesets, _) = random_changeset_range(
&mut rng,
blocks.iter(),
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
0..0,
0..0,
);
db.insert_changesets_to_static_files(changesets.clone(), None)
.expect("insert changesets to static files");
db.insert_history(changesets.clone(), None).expect("insert history");
let account_occurrences = db.table::<tables::AccountsHistory>().unwrap().into_iter().fold(
BTreeMap::<_, usize>::new(),
|mut map, (key, _)| {
map.entry(key.key).or_default().add_assign(1);
map
},
);
assert!(account_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
let original_shards = db.table::<tables::AccountsHistory>().unwrap();
let test_prune =
|to_block: BlockNumber, run: usize, expected_result: (PruneProgress, usize)| {
let prune_mode = PruneMode::Before(to_block);
let deleted_entries_limit = 2000;
let mut limiter =
PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
let input = PruneInput {
previous_checkpoint: db
.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::AccountHistory)
.unwrap(),
to_block,
limiter: limiter.clone(),
};
let segment = AccountHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default().with_account_changesets_in_static_files(true),
);
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
assert_matches!(
result,
SegmentOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);
segment
.save_checkpoint(
&provider,
result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
)
.unwrap();
provider.commit().expect("commit");
let changesets = changesets
.iter()
.enumerate()
.flat_map(|(block_number, changeset)| {
changeset.iter().map(move |change| (block_number, change))
})
.collect::<Vec<_>>();
#[expect(clippy::skip_while_next)]
let pruned = changesets
.iter()
.enumerate()
.skip_while(|(i, (block_number, _))| {
*i < deleted_entries_limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE * run &&
*block_number <= to_block as usize
})
.next()
.map(|(i, _)| i)
.unwrap_or_default();
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
let last_pruned_block_number = pruned_changesets
.next()
.map(|(block_number, _)| {
(if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
}) as BlockNumber
})
.unwrap_or(to_block);
let actual_shards = db.table::<tables::AccountsHistory>().unwrap();
let expected_shards = original_shards
.iter()
.filter(|(key, _)| key.highest_block_number > last_pruned_block_number)
.map(|(key, blocks)| {
let new_blocks =
blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
})
.collect::<Vec<_>>();
assert_eq!(actual_shards, expected_shards);
assert_eq!(
db.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::AccountHistory)
.unwrap(),
Some(PruneCheckpoint {
block_number: Some(last_pruned_block_number),
tx_number: None,
prune_mode
})
);
};
test_prune(
998,
1,
(PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 1000),
);
test_prune(998, 2, (PruneProgress::Finished, 1000));
test_prune(1400, 3, (PruneProgress::Finished, 804));
}
}

View File

@@ -1,4 +1,6 @@
use crate::PruneLimiter;
use alloy_primitives::BlockNumber;
use itertools::Itertools;
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
models::ShardedKey,
@@ -7,6 +9,8 @@ use reth_db_api::{
BlockNumberList, DatabaseError, RawKey, RawTable, RawValue,
};
use reth_provider::DBProvider;
use reth_prune_types::{SegmentOutput, SegmentOutputCheckpoint};
use rustc_hash::FxHashMap;
enum PruneShardOutcome {
Deleted,
@@ -21,6 +25,65 @@ pub(crate) struct PrunedIndices {
pub(crate) unchanged: usize,
}
/// Result of pruning history changesets, used to build the final output.
pub(crate) struct HistoryPruneResult<K> {
/// Map of the highest deleted changeset keys to their block numbers.
pub(crate) highest_deleted: FxHashMap<K, BlockNumber>,
/// The last block number that had changesets pruned.
pub(crate) last_pruned_block: Option<BlockNumber>,
/// Number of changesets pruned.
pub(crate) pruned_count: usize,
/// Whether pruning is complete.
pub(crate) done: bool,
}
/// Finalizes history pruning by sorting sharded keys, pruning history indices, and building output.
///
/// This is shared between static file and database pruning for both account and storage history.
pub(crate) fn finalize_history_prune<Provider, T, K, SK>(
provider: &Provider,
result: HistoryPruneResult<K>,
range_end: BlockNumber,
limiter: &PruneLimiter,
to_sharded_key: impl Fn(K, BlockNumber) -> T::Key,
key_matches: impl Fn(&T::Key, &T::Key) -> bool,
) -> Result<SegmentOutput, DatabaseError>
where
Provider: DBProvider<Tx: DbTxMut>,
T: Table<Value = BlockNumberList>,
T::Key: AsRef<ShardedKey<SK>>,
K: Ord,
{
let HistoryPruneResult { highest_deleted, last_pruned_block, pruned_count, done } = result;
// If there's more changesets to prune, set the checkpoint block number to previous,
// so we could finish pruning its changesets on the next run.
let last_changeset_pruned_block = last_pruned_block
.map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
.unwrap_or(range_end);
// Sort highest deleted block numbers and turn them into sharded keys.
// We use `sorted_unstable` because no equal keys exist in the map.
let highest_sharded_keys =
highest_deleted.into_iter().sorted_unstable().map(|(key, block_number)| {
to_sharded_key(key, block_number.min(last_changeset_pruned_block))
});
let outcomes =
prune_history_indices::<Provider, T, _>(provider, highest_sharded_keys, key_matches)?;
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
pruned: pruned_count + outcomes.deleted,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
})
}
/// Prune history indices according to the provided list of highest sharded keys.
///
/// Returns total number of deleted, updated and unchanged entities.

View File

@@ -1,20 +1,27 @@
use crate::{
db_ext::DbTxPruneExt,
segments::{user::history::prune_history_indices, PruneInput, Segment, SegmentOutput},
segments::{
user::history::{finalize_history_prune, HistoryPruneResult},
PruneInput, Segment,
},
PrunerError,
};
use itertools::Itertools;
use alloy_primitives::{Address, BlockNumber, B256};
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress},
tables,
transaction::DbTxMut,
};
use reth_provider::DBProvider;
use reth_prune_types::{PruneMode, PrunePurpose, PruneSegment, SegmentOutputCheckpoint};
use reth_provider::{DBProvider, EitherWriter, StaticFileProviderFactory};
use reth_prune_types::{
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use reth_static_file_types::StaticFileSegment;
use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
use rustc_hash::FxHashMap;
use tracing::{instrument, trace};
/// Number of storage history tables to prune in one step
/// Number of storage history tables to prune in one step.
///
/// Storage History consists of two tables: [`tables::StorageChangeSets`] and
/// [`tables::StoragesHistory`]. We want to prune them to the same block number.
@@ -33,7 +40,10 @@ impl StorageHistory {
impl<Provider> Segment<Provider> for StorageHistory
where
Provider: DBProvider<Tx: DbTxMut>,
Provider: DBProvider<Tx: DbTxMut>
+ StaticFileProviderFactory
+ StorageChangeSetReader
+ StorageSettingsCache,
{
fn segment(&self) -> PruneSegment {
PruneSegment::StorageHistory
@@ -58,11 +68,32 @@ where
};
let range_end = *range.end();
if EitherWriter::storage_changesets_destination(provider).is_static_file() {
self.prune_static_files(provider, input, range, range_end)
} else {
self.prune_database(provider, input, range, range_end)
}
}
}
impl StorageHistory {
/// Prunes storage history when changesets are stored in static files.
fn prune_static_files<Provider>(
&self,
provider: &Provider,
input: PruneInput,
range: std::ops::RangeInclusive<BlockNumber>,
range_end: BlockNumber,
) -> Result<SegmentOutput, PrunerError>
where
Provider: DBProvider<Tx: DbTxMut> + StaticFileProviderFactory,
{
let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE)
} else {
input.limiter
};
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
@@ -70,15 +101,90 @@ where
))
}
// The size of this map is limited by `prune_delete_limit * blocks_since_last_run /
// STORAGE_HISTORY_TABLES_TO_PRUNE`, and with current defaults it's usually `3500 * 5
// / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total
// size should be up to ~0.5MB + some hashmap overhead. `blocks_since_last_run` is
// additionally limited by the `max_reorg_depth`, so no OOM is expected here.
let mut highest_deleted_storages = FxHashMap::default();
let mut last_changeset_pruned_block = None;
let mut pruned_changesets = 0;
let mut done = true;
let walker = provider.static_file_provider().walk_storage_changeset_range(range);
for result in walker {
if limiter.is_limit_reached() {
done = false;
break;
}
let (block_address, entry) = result?;
let block_number = block_address.block_number();
let address = block_address.address();
highest_deleted_storages.insert((address, entry.key), block_number);
last_changeset_pruned_block = Some(block_number);
pruned_changesets += 1;
limiter.increment_deleted_entries_count();
}
// Delete static file jars below the pruned block
if let Some(last_block) = last_changeset_pruned_block {
provider
.static_file_provider()
.delete_segment_below_block(StaticFileSegment::StorageChangeSets, last_block + 1)?;
}
trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned storage history (changesets from static files)");
let result = HistoryPruneResult {
highest_deleted: highest_deleted_storages,
last_pruned_block: last_changeset_pruned_block,
pruned_count: pruned_changesets,
done,
};
finalize_history_prune::<_, tables::StoragesHistory, (Address, B256), _>(
provider,
result,
range_end,
&limiter,
|(address, storage_key), block_number| {
StorageShardedKey::new(address, storage_key, block_number)
},
|a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
)
.map_err(Into::into)
}
fn prune_database<Provider>(
&self,
provider: &Provider,
input: PruneInput,
range: std::ops::RangeInclusive<BlockNumber>,
range_end: BlockNumber,
) -> Result<SegmentOutput, PrunerError>
where
Provider: DBProvider<Tx: DbTxMut>,
{
let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE)
} else {
input.limiter
};
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
limiter.interrupt_reason(),
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
))
}
// Deleted storage changeset keys (account addresses and storage slots) with the highest
// block number deleted for that key.
//
// The size of this map it's limited by `prune_delete_limit * blocks_since_last_run /
// STORAGE_HISTORY_TABLES_TO_PRUNE`, and with current default it's usually `3500 * 5
// The size of this map is limited by `prune_delete_limit * blocks_since_last_run /
// STORAGE_HISTORY_TABLES_TO_PRUNE`, and with current defaults it's usually `3500 * 5
// / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total
// size should be up to 0.5MB + some hashmap overhead. `blocks_since_last_run` is
// size should be up to ~0.5MB + some hashmap overhead. `blocks_since_last_run` is
// additionally limited by the `max_reorg_depth`, so no OOM is expected here.
let mut last_changeset_pruned_block = None;
let mut highest_deleted_storages = FxHashMap::default();
let (pruned_changesets, done) =
provider.tx_ref().prune_table_with_range::<tables::StorageChangeSets>(
@@ -92,64 +198,46 @@ where
)?;
trace!(target: "pruner", deleted = %pruned_changesets, %done, "Pruned storage history (changesets)");
let last_changeset_pruned_block = last_changeset_pruned_block
// If there's more storage changesets to prune, set the checkpoint block number to
// previous, so we could finish pruning its storage changesets on the next run.
.map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
.unwrap_or(range_end);
// Sort highest deleted block numbers by account address and storage key and turn them into
// sharded keys.
// We did not use `BTreeMap` from the beginning, because it's inefficient for hashes.
let highest_sharded_keys = highest_deleted_storages
.into_iter()
.sorted_unstable() // Unstable is fine because no equal keys exist in the map
.map(|((address, storage_key), block_number)| {
StorageShardedKey::new(
address,
storage_key,
block_number.min(last_changeset_pruned_block),
)
});
let outcomes = prune_history_indices::<Provider, tables::StoragesHistory, _>(
let result = HistoryPruneResult {
highest_deleted: highest_deleted_storages,
last_pruned_block: last_changeset_pruned_block,
pruned_count: pruned_changesets,
done,
};
finalize_history_prune::<_, tables::StoragesHistory, (Address, B256), _>(
provider,
highest_sharded_keys,
result,
range_end,
&limiter,
|(address, storage_key), block_number| {
StorageShardedKey::new(address, storage_key, block_number)
},
|a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
)?;
trace!(target: "pruner", ?outcomes, %done, "Pruned storage history (indices)");
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
pruned: pruned_changesets + outcomes.deleted,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
})
)
.map_err(Into::into)
}
}
#[cfg(test)]
mod tests {
use crate::segments::{
user::storage_history::STORAGE_HISTORY_TABLES_TO_PRUNE, PruneInput, PruneLimiter, Segment,
SegmentOutput, StorageHistory,
};
use super::STORAGE_HISTORY_TABLES_TO_PRUNE;
use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, StorageHistory};
use alloy_primitives::{BlockNumber, B256};
use assert_matches::assert_matches;
use reth_db_api::{tables, BlockNumberList};
use reth_db_api::{models::StorageSettings, tables, BlockNumberList};
use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader};
use reth_prune_types::{PruneCheckpoint, PruneMode, PruneProgress, PruneSegment};
use reth_prune_types::{
PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_storage_api::StorageSettingsCache;
use reth_testing_utils::generators::{
self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
};
use std::{collections::BTreeMap, ops::AddAssign};
#[test]
fn prune() {
fn prune_legacy() {
let db = TestStageDB::default();
let mut rng = generators::rng();
@@ -208,6 +296,9 @@ mod tests {
let segment = StorageHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default().with_storage_changesets_in_static_files(false),
);
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
@@ -247,19 +338,19 @@ mod tests {
.map(|(i, _)| i)
.unwrap_or_default();
let mut pruned_changesets = changesets
.iter()
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
.skip(pruned.saturating_sub(1));
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
let last_pruned_block_number = pruned_changesets
.next()
.map(|(block_number, _, _)| if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
} as BlockNumber)
.map(|(block_number, _, _)| {
(if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
}) as BlockNumber
})
.unwrap_or(to_block);
let pruned_changesets = pruned_changesets.fold(
@@ -306,14 +397,160 @@ mod tests {
test_prune(
998,
1,
(
PruneProgress::HasMoreData(
reth_prune_types::PruneInterruptReason::DeletedEntriesLimitReached,
),
500,
),
(PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 500),
);
test_prune(998, 2, (PruneProgress::Finished, 499));
test_prune(1200, 3, (PruneProgress::Finished, 202));
}
#[test]
fn prune_static_file() {
let db = TestStageDB::default();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=5000,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
let (changesets, _) = random_changeset_range(
&mut rng,
blocks.iter(),
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
1..2,
1..2,
);
db.insert_changesets_to_static_files(changesets.clone(), None)
.expect("insert changesets to static files");
db.insert_history(changesets.clone(), None).expect("insert history");
let storage_occurrences = db.table::<tables::StoragesHistory>().unwrap().into_iter().fold(
BTreeMap::<_, usize>::new(),
|mut map, (key, _)| {
map.entry((key.address, key.sharded_key.key)).or_default().add_assign(1);
map
},
);
assert!(storage_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
let original_shards = db.table::<tables::StoragesHistory>().unwrap();
let test_prune = |to_block: BlockNumber,
run: usize,
expected_result: (PruneProgress, usize)| {
let prune_mode = PruneMode::Before(to_block);
let deleted_entries_limit = 1000;
let mut limiter =
PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
let input = PruneInput {
previous_checkpoint: db
.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::StorageHistory)
.unwrap(),
to_block,
limiter: limiter.clone(),
};
let segment = StorageHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(
StorageSettings::default().with_storage_changesets_in_static_files(true),
);
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
assert_matches!(
result,
SegmentOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);
segment
.save_checkpoint(
&provider,
result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
)
.unwrap();
provider.commit().expect("commit");
let changesets = changesets
.iter()
.enumerate()
.flat_map(|(block_number, changeset)| {
changeset.iter().flat_map(move |(address, _, entries)| {
entries.iter().map(move |entry| (block_number, address, entry))
})
})
.collect::<Vec<_>>();
#[expect(clippy::skip_while_next)]
let pruned = changesets
.iter()
.enumerate()
.skip_while(|(i, (block_number, _, _))| {
*i < deleted_entries_limit / STORAGE_HISTORY_TABLES_TO_PRUNE * run &&
*block_number <= to_block as usize
})
.next()
.map(|(i, _)| i)
.unwrap_or_default();
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
let last_pruned_block_number = pruned_changesets
.next()
.map(|(block_number, _, _)| {
(if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
}) as BlockNumber
})
.unwrap_or(to_block);
let actual_shards = db.table::<tables::StoragesHistory>().unwrap();
let expected_shards = original_shards
.iter()
.filter(|(key, _)| key.sharded_key.highest_block_number > last_pruned_block_number)
.map(|(key, blocks)| {
let new_blocks =
blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
})
.collect::<Vec<_>>();
assert_eq!(actual_shards, expected_shards);
assert_eq!(
db.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::StorageHistory)
.unwrap(),
Some(PruneCheckpoint {
block_number: Some(last_pruned_block_number),
tx_number: None,
prune_mode
})
);
};
test_prune(
998,
1,
(PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 500),
);
test_prune(998, 2, (PruneProgress::Finished, 500));
test_prune(1200, 3, (PruneProgress::Finished, 202));
}
}

View File

@@ -24,9 +24,9 @@ pub enum PruneSegment {
Receipts,
/// Prune segment responsible for some rows in `Receipts` table filtered by logs.
ContractLogs,
/// Prune segment responsible for the `AccountChangeSets` and `AccountsHistory` tables.
/// Prunes account changesets (static files/MDBX) and `AccountsHistory`.
AccountHistory,
/// Prune segment responsible for the `StorageChangeSets` and `StoragesHistory` tables.
/// Prunes storage changesets (static files/MDBX) and `StoragesHistory`.
StorageHistory,
#[deprecated = "Variant indexes cannot be changed"]
#[strum(disabled)]

View File

@@ -122,6 +122,7 @@ impl RethRpcServerConfig for RpcServerArgs {
max_receipts: self.rpc_state_cache.max_receipts,
max_headers: self.rpc_state_cache.max_headers,
max_concurrent_db_requests: self.rpc_state_cache.max_concurrent_db_requests,
max_cached_tx_hashes: self.rpc_state_cache.max_cached_tx_hashes,
}
}

View File

@@ -41,7 +41,6 @@ metrics.workspace = true
async-trait.workspace = true
jsonrpsee-core.workspace = true
jsonrpsee-types.workspace = true
parking_lot.workspace = true
serde.workspace = true
thiserror.workspace = true
tracing.workspace = true

View File

@@ -1,274 +0,0 @@
//! Block Access List (BAL) cache for EIP-7928.
//!
//! This module provides an in-memory cache for storing Block Access Lists received via
//! the Engine API. BALs are stored for valid payloads and can be retrieved via
//! `engine_getBALsByHashV1` and `engine_getBALsByRangeV1`.
//!
//! According to EIP-7928, the EL MUST retain BALs for at least the duration of the
//! weak subjectivity period (~3533 epochs) to support synchronization with re-execution.
//! This initial implementation uses a simple in-memory cache with configurable capacity.
use alloy_primitives::{BlockHash, BlockNumber, Bytes};
use parking_lot::RwLock;
use reth_metrics::{
metrics::{Counter, Gauge},
Metrics,
};
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
};
/// Default capacity for the BAL cache.
///
/// This is a conservative default - production deployments should configure based on
/// weak subjectivity period requirements (~3533 epochs ≈ 113,000 blocks).
const DEFAULT_BAL_CACHE_CAPACITY: u32 = 1024;
/// In-memory cache for Block Access Lists (BALs).
///
/// Provides O(1) lookups by block hash and O(log n) range queries by block number.
/// Evicts the oldest (lowest) block numbers when capacity is exceeded.
///
/// This type is cheaply cloneable as it wraps an `Arc` internally.
#[derive(Debug, Clone)]
pub struct BalCache {
inner: Arc<BalCacheInner>,
}
#[derive(Debug)]
struct BalCacheInner {
/// Maximum number of entries to store.
capacity: u32,
/// Mapping from block hash to BAL bytes.
entries: RwLock<HashMap<BlockHash, Bytes>>,
/// Index mapping block number to block hash for range queries.
/// Uses `BTreeMap` for efficient range iteration and eviction of oldest blocks.
block_index: RwLock<BTreeMap<BlockNumber, BlockHash>>,
/// Cache metrics.
metrics: BalCacheMetrics,
}
impl BalCache {
/// Creates a new BAL cache with the default capacity.
pub fn new() -> Self {
Self::with_capacity(DEFAULT_BAL_CACHE_CAPACITY)
}
/// Creates a new BAL cache with the specified capacity.
pub fn with_capacity(capacity: u32) -> Self {
Self {
inner: Arc::new(BalCacheInner {
capacity,
entries: RwLock::new(HashMap::new()),
block_index: RwLock::new(BTreeMap::new()),
metrics: BalCacheMetrics::default(),
}),
}
}
/// Inserts a BAL into the cache.
///
/// If a different hash already exists for this block number (reorg), the old entry
/// is removed first. If the cache is at capacity, the oldest block number is evicted.
pub fn insert(&self, block_hash: BlockHash, block_number: BlockNumber, bal: Bytes) {
let mut entries = self.inner.entries.write();
let mut block_index = self.inner.block_index.write();
// If this block number already has a different hash, remove the old entry
if let Some(old_hash) = block_index.get(&block_number) &&
*old_hash != block_hash
{
entries.remove(old_hash);
}
// Evict oldest block if at capacity and this is a new entry
if !entries.contains_key(&block_hash) &&
entries.len() as u32 >= self.inner.capacity &&
let Some((&oldest_num, &oldest_hash)) = block_index.first_key_value()
{
entries.remove(&oldest_hash);
block_index.remove(&oldest_num);
}
entries.insert(block_hash, bal);
block_index.insert(block_number, block_hash);
self.inner.metrics.inserts.increment(1);
self.inner.metrics.count.set(entries.len() as f64);
}
/// Retrieves BALs for the given block hashes.
///
/// Returns a vector with the same length as `block_hashes`, where each element
/// is `Some(bal)` if found or `None` if not in cache.
pub fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> Vec<Option<Bytes>> {
let entries = self.inner.entries.read();
block_hashes
.iter()
.map(|hash| {
let result = entries.get(hash).cloned();
if result.is_some() {
self.inner.metrics.hits.increment(1);
} else {
self.inner.metrics.misses.increment(1);
}
result
})
.collect()
}
/// Retrieves BALs for a range of blocks starting at `start` for `count` blocks.
///
/// Returns a vector of contiguous BALs in block number order, stopping at the first
/// missing block. This ensures the caller knows the returned BALs correspond to
/// blocks `[start, start + len)`.
pub fn get_by_range(&self, start: BlockNumber, count: u64) -> Vec<Bytes> {
let entries = self.inner.entries.read();
let block_index = self.inner.block_index.read();
let mut result = Vec::new();
for block_num in start..start.saturating_add(count) {
let Some(hash) = block_index.get(&block_num) else {
break;
};
let Some(bal) = entries.get(hash) else {
break;
};
result.push(bal.clone());
}
result
}
/// Returns the number of entries in the cache.
#[cfg(test)]
fn len(&self) -> usize {
self.inner.entries.read().len()
}
}
impl Default for BalCache {
fn default() -> Self {
Self::new()
}
}
/// Metrics for the BAL cache.
#[derive(Metrics)]
#[metrics(scope = "engine.bal_cache")]
struct BalCacheMetrics {
/// The total number of BALs in the cache.
count: Gauge,
/// The number of cache inserts.
inserts: Counter,
/// The number of cache hits.
hits: Counter,
/// The number of cache misses.
misses: Counter,
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_primitives::B256;
#[test]
fn test_insert_and_get_by_hash() {
let cache = BalCache::with_capacity(10);
let hash1 = B256::random();
let hash2 = B256::random();
let bal1 = Bytes::from_static(b"bal1");
let bal2 = Bytes::from_static(b"bal2");
cache.insert(hash1, 1, bal1.clone());
cache.insert(hash2, 2, bal2.clone());
let results = cache.get_by_hashes(&[hash1, hash2, B256::random()]);
assert_eq!(results.len(), 3);
assert_eq!(results[0], Some(bal1));
assert_eq!(results[1], Some(bal2));
assert_eq!(results[2], None);
}
#[test]
fn test_get_by_range() {
let cache = BalCache::with_capacity(10);
for i in 1..=5 {
let hash = B256::random();
let bal = Bytes::from(format!("bal{i}").into_bytes());
cache.insert(hash, i, bal);
}
let results = cache.get_by_range(2, 3);
assert_eq!(results.len(), 3);
}
#[test]
fn test_get_by_range_stops_at_gap() {
let cache = BalCache::with_capacity(10);
// Insert blocks 1, 2, 4, 5 (missing block 3)
for i in [1, 2, 4, 5] {
let hash = B256::random();
let bal = Bytes::from(format!("bal{i}").into_bytes());
cache.insert(hash, i, bal);
}
// Requesting range starting at 1 should stop at the gap (block 3)
let results = cache.get_by_range(1, 5);
assert_eq!(results.len(), 2); // Only blocks 1 and 2
// Requesting range starting at 4 should return 4 and 5
let results = cache.get_by_range(4, 3);
assert_eq!(results.len(), 2);
}
#[test]
fn test_eviction_oldest_first() {
let cache = BalCache::with_capacity(3);
// Insert blocks 10, 20, 30
for i in [10, 20, 30] {
let hash = B256::random();
cache.insert(hash, i, Bytes::from_static(b"bal"));
}
assert_eq!(cache.len(), 3);
// Insert block 40, should evict block 10 (oldest/lowest)
let hash40 = B256::random();
cache.insert(hash40, 40, Bytes::from_static(b"bal40"));
assert_eq!(cache.len(), 3);
// Block 10 should be gone, block 20 should still be there
let results = cache.get_by_range(10, 1);
assert_eq!(results.len(), 0);
let results = cache.get_by_range(20, 1);
assert_eq!(results.len(), 1);
}
#[test]
fn test_reorg_replaces_hash() {
let cache = BalCache::with_capacity(10);
let hash1 = B256::random();
let hash2 = B256::random();
let bal1 = Bytes::from_static(b"bal1");
let bal2 = Bytes::from_static(b"bal2");
// Insert block 100 with hash1
cache.insert(hash1, 100, bal1.clone());
assert_eq!(cache.get_by_hashes(&[hash1])[0], Some(bal1));
// Reorg: insert block 100 with hash2
cache.insert(hash2, 100, bal2.clone());
// hash1 should be gone, hash2 should be there
assert_eq!(cache.get_by_hashes(&[hash1])[0], None);
assert_eq!(cache.get_by_hashes(&[hash2])[0], Some(bal2));
assert_eq!(cache.len(), 1);
}
}

View File

@@ -3,6 +3,13 @@
use std::collections::HashSet;
use tracing::warn;
/// Critical Engine API method prefixes that warrant warnings on capability mismatches.
///
/// These are essential for block production and chain synchronization. Missing support
/// for these methods indicates a significant version mismatch that operators should address.
const CRITICAL_METHOD_PREFIXES: &[&str] =
&["engine_forkchoiceUpdated", "engine_getPayload", "engine_newPayload"];
/// All Engine API capabilities supported by Reth (Ethereum mainnet).
///
/// See <https://github.com/ethereum/execution-apis/tree/main/src/engine> for updates.
@@ -72,31 +79,52 @@ impl EngineCapabilities {
CapabilityMismatches { missing_in_el, missing_in_cl }
}
/// Logs warnings if CL and EL capabilities don't match.
/// Logs warnings if CL and EL capabilities don't match for critical methods.
///
/// Called during `engine_exchangeCapabilities` to warn operators about
/// version mismatches between the consensus layer and execution layer.
///
/// Only warns about critical methods (`engine_forkchoiceUpdated`, `engine_getPayload`,
/// `engine_newPayload`) that are essential for block production and chain synchronization.
/// Non-critical methods like `engine_getBlobs` are not warned about since not all
/// clients support them.
pub fn log_capability_mismatches(&self, cl_capabilities: &[String]) {
let mismatches = self.get_capability_mismatches(cl_capabilities);
if !mismatches.missing_in_el.is_empty() {
let critical_missing_in_el: Vec<_> =
mismatches.missing_in_el.iter().filter(|m| is_critical_method(m)).cloned().collect();
let critical_missing_in_cl: Vec<_> =
mismatches.missing_in_cl.iter().filter(|m| is_critical_method(m)).cloned().collect();
if !critical_missing_in_el.is_empty() {
warn!(
target: "rpc::engine",
missing = ?mismatches.missing_in_el,
missing = ?critical_missing_in_el,
"CL supports Engine API methods that Reth doesn't. Consider upgrading Reth."
);
}
if !mismatches.missing_in_cl.is_empty() {
if !critical_missing_in_cl.is_empty() {
warn!(
target: "rpc::engine",
missing = ?mismatches.missing_in_cl,
missing = ?critical_missing_in_cl,
"Reth supports Engine API methods that CL doesn't. Consider upgrading your consensus client."
);
}
}
}
/// Returns `true` if the method is critical for block production and chain synchronization.
fn is_critical_method(method: &str) -> bool {
CRITICAL_METHOD_PREFIXES.iter().any(|prefix| {
method.starts_with(prefix) &&
method[prefix.len()..]
.strip_prefix('V')
.is_some_and(|s| s.chars().next().is_some_and(|c| c.is_ascii_digit()))
})
}
impl Default for EngineCapabilities {
fn default() -> Self {
Self::new(CAPABILITIES.iter().copied())
@@ -173,4 +201,20 @@ mod tests {
assert_eq!(result.missing_in_el, vec!["a_other", "z_other"]);
assert_eq!(result.missing_in_cl, vec!["a_method", "z_method"]);
}
#[test]
fn test_is_critical_method() {
assert!(is_critical_method("engine_forkchoiceUpdatedV1"));
assert!(is_critical_method("engine_forkchoiceUpdatedV3"));
assert!(is_critical_method("engine_getPayloadV1"));
assert!(is_critical_method("engine_getPayloadV4"));
assert!(is_critical_method("engine_newPayloadV1"));
assert!(is_critical_method("engine_newPayloadV4"));
assert!(!is_critical_method("engine_getBlobsV1"));
assert!(!is_critical_method("engine_getBlobsV3"));
assert!(!is_critical_method("engine_getPayloadBodiesByHashV1"));
assert!(!is_critical_method("engine_getPayloadBodiesByRangeV1"));
assert!(!is_critical_method("engine_getClientVersionV1"));
}
}

View File

@@ -1,15 +1,13 @@
use crate::{
bal_cache::BalCache, capabilities::EngineCapabilities, metrics::EngineApiMetrics,
EngineApiError, EngineApiResult,
capabilities::EngineCapabilities, metrics::EngineApiMetrics, EngineApiError, EngineApiResult,
};
use alloy_eips::{
eip1898::BlockHashOrNumber,
eip4844::{BlobAndProofV1, BlobAndProofV2},
eip4895::Withdrawals,
eip7685::RequestsOrHash,
BlockNumHash,
};
use alloy_primitives::{BlockHash, BlockNumber, Bytes, B256, U64};
use alloy_primitives::{BlockHash, BlockNumber, B256, U64};
use alloy_rpc_types_engine::{
CancunPayloadFields, ClientVersionV1, ExecutionData, ExecutionPayloadBodiesV1,
ExecutionPayloadBodyV1, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1,
@@ -23,7 +21,7 @@ use reth_engine_primitives::{ConsensusEngineHandle, EngineApiValidator, EngineTy
use reth_network_api::NetworkInfo;
use reth_payload_builder::PayloadStore;
use reth_payload_primitives::{
validate_payload_timestamp, EngineApiMessageVersion, ExecutionPayload, MessageValidationKind,
validate_payload_timestamp, EngineApiMessageVersion, MessageValidationKind,
PayloadOrAttributes, PayloadTypes,
};
use reth_primitives_traits::{Block, BlockBody};
@@ -98,38 +96,6 @@ where
validator: Validator,
accept_execution_requests_hash: bool,
network: impl NetworkInfo + 'static,
) -> Self {
Self::with_bal_cache(
provider,
chain_spec,
beacon_consensus,
payload_store,
tx_pool,
task_spawner,
client,
capabilities,
validator,
accept_execution_requests_hash,
network,
BalCache::new(),
)
}
/// Create new instance of [`EngineApi`] with a custom BAL cache.
#[expect(clippy::too_many_arguments)]
pub fn with_bal_cache(
provider: Provider,
chain_spec: Arc<ChainSpec>,
beacon_consensus: ConsensusEngineHandle<PayloadT>,
payload_store: PayloadStore<PayloadT>,
tx_pool: Pool,
task_spawner: Box<dyn TaskSpawner>,
client: ClientVersionV1,
capabilities: EngineCapabilities,
validator: Validator,
accept_execution_requests_hash: bool,
network: impl NetworkInfo + 'static,
bal_cache: BalCache,
) -> Self {
let is_syncing = Arc::new(move || network.is_syncing());
let inner = Arc::new(EngineApiInner {
@@ -145,25 +111,10 @@ where
validator,
accept_execution_requests_hash,
is_syncing,
bal_cache,
});
Self { inner }
}
/// Returns a reference to the BAL cache.
pub fn bal_cache(&self) -> &BalCache {
&self.inner.bal_cache
}
/// Caches the BAL if the status is valid.
fn maybe_cache_bal(&self, num_hash: BlockNumHash, bal: Option<Bytes>, status: &PayloadStatus) {
if status.is_valid() &&
let Some(bal) = bal
{
self.inner.bal_cache.insert(num_hash.hash, num_hash.number, bal);
}
}
/// Fetches the client version.
pub fn get_client_version_v1(
&self,
@@ -198,11 +149,7 @@ where
.validator
.validate_version_specific_fields(EngineApiMessageVersion::V1, payload_or_attrs)?;
let num_hash = payload.num_hash();
let bal = payload.block_access_list().cloned();
let status = self.inner.beacon_consensus.new_payload(payload).await?;
self.maybe_cache_bal(num_hash, bal, &status);
Ok(status)
Ok(self.inner.beacon_consensus.new_payload(payload).await?)
}
/// Metered version of `new_payload_v1`.
@@ -230,12 +177,7 @@ where
self.inner
.validator
.validate_version_specific_fields(EngineApiMessageVersion::V2, payload_or_attrs)?;
let num_hash = payload.num_hash();
let bal = payload.block_access_list().cloned();
let status = self.inner.beacon_consensus.new_payload(payload).await?;
self.maybe_cache_bal(num_hash, bal, &status);
Ok(status)
Ok(self.inner.beacon_consensus.new_payload(payload).await?)
}
/// Metered version of `new_payload_v2`.
@@ -264,11 +206,7 @@ where
.validator
.validate_version_specific_fields(EngineApiMessageVersion::V3, payload_or_attrs)?;
let num_hash = payload.num_hash();
let bal = payload.block_access_list().cloned();
let status = self.inner.beacon_consensus.new_payload(payload).await?;
self.maybe_cache_bal(num_hash, bal, &status);
Ok(status)
Ok(self.inner.beacon_consensus.new_payload(payload).await?)
}
/// Metrics version of `new_payload_v3`
@@ -298,11 +236,7 @@ where
.validator
.validate_version_specific_fields(EngineApiMessageVersion::V4, payload_or_attrs)?;
let num_hash = payload.num_hash();
let bal = payload.block_access_list().cloned();
let status = self.inner.beacon_consensus.new_payload(payload).await?;
self.maybe_cache_bal(num_hash, bal, &status);
Ok(status)
Ok(self.inner.beacon_consensus.new_payload(payload).await?)
}
/// Metrics version of `new_payload_v4`
@@ -947,22 +881,6 @@ where
res
}
/// Retrieves BALs for the given block hashes from the cache.
///
/// Returns the RLP-encoded BALs for blocks found in the cache.
/// Missing blocks are returned as empty bytes.
pub fn get_bals_by_hash(&self, block_hashes: Vec<BlockHash>) -> Vec<alloy_primitives::Bytes> {
let results = self.inner.bal_cache.get_by_hashes(&block_hashes);
results.into_iter().map(|opt| opt.unwrap_or_default()).collect()
}
/// Retrieves BALs for a range of blocks from the cache.
///
/// Returns the RLP-encoded BALs for blocks in the range `[start, start + count)`.
pub fn get_bals_by_range(&self, start: u64, count: u64) -> Vec<alloy_primitives::Bytes> {
self.inner.bal_cache.get_by_range(start, count)
}
}
// This is the concrete ethereum engine API implementation.
@@ -1287,10 +1205,12 @@ where
/// See also <https://eips.ethereum.org/EIPS/eip-7928>
async fn get_bals_by_hash_v1(
&self,
block_hashes: Vec<BlockHash>,
_block_hashes: Vec<BlockHash>,
) -> RpcResult<Vec<alloy_primitives::Bytes>> {
trace!(target: "rpc::engine", "Serving engine_getBALsByHashV1");
Ok(self.get_bals_by_hash(block_hashes))
Err(EngineApiError::EngineObjectValidationError(
reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
))?
}
/// Handler for `engine_getBALsByRangeV1`
@@ -1298,11 +1218,13 @@ where
/// See also <https://eips.ethereum.org/EIPS/eip-7928>
async fn get_bals_by_range_v1(
&self,
start: U64,
count: U64,
_start: U64,
_count: U64,
) -> RpcResult<Vec<alloy_primitives::Bytes>> {
trace!(target: "rpc::engine", "Serving engine_getBALsByRangeV1");
Ok(self.get_bals_by_range(start.to(), count.to()))
Err(EngineApiError::EngineObjectValidationError(
reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
))?
}
}
@@ -1362,8 +1284,6 @@ struct EngineApiInner<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSp
accept_execution_requests_hash: bool,
/// Returns `true` if the node is currently syncing.
is_syncing: Arc<dyn Fn() -> bool + Send + Sync>,
/// Cache for Block Access Lists (BALs) per EIP-7928.
bal_cache: BalCache,
}
#[cfg(test)]

View File

@@ -12,10 +12,6 @@
/// The Engine API implementation.
mod engine_api;
/// Block Access List (BAL) cache for EIP-7928.
mod bal_cache;
pub use bal_cache::BalCache;
/// Engine API capabilities.
pub mod capabilities;
pub use capabilities::EngineCapabilities;

View File

@@ -20,8 +20,8 @@ use alloy_rpc_types_eth::{
use futures::Future;
use reth_errors::{ProviderError, RethError};
use reth_evm::{
env::BlockEnvironment, ConfigureEvm, Evm, EvmEnvFor, HaltReasonFor, InspectorFor,
TransactionEnv, TxEnvFor,
env::BlockEnvironment, execute::BlockBuilder, ConfigureEvm, Evm, EvmEnvFor, HaltReasonFor,
InspectorFor, TransactionEnv, TxEnvFor,
};
use reth_node_api::BlockBody;
use reth_primitives_traits::Recovered;
@@ -96,7 +96,37 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
self.spawn_with_state_at_block(block, move |this, mut db| {
let mut blocks: Vec<SimulatedBlock<RpcBlock<Self::NetworkTypes>>> =
Vec::with_capacity(block_state_calls.len());
// Track previous block number and timestamp for validation
let mut prev_block_number = parent.number();
let mut prev_timestamp = parent.timestamp();
for block in block_state_calls {
// Validate block number ordering if overridden
if let Some(number) = block.block_overrides.as_ref().and_then(|o| o.number) {
let number: u64 = number.try_into().unwrap_or(u64::MAX);
if number <= prev_block_number {
return Err(EthApiError::other(EthSimulateError::BlockNumberInvalid {
got: number,
parent: prev_block_number,
})
.into());
}
}
// Validate timestamp ordering if overridden
if let Some(time) = block
.block_overrides
.as_ref()
.and_then(|o| o.time)
.filter(|&t| t <= prev_timestamp)
{
return Err(EthApiError::other(EthSimulateError::BlockTimestampInvalid {
got: time,
parent: prev_timestamp,
})
.into());
}
let mut evm_env = this
.evm_config()
.next_evm_env(&parent, &this.next_env_attributes(&parent)?)
@@ -116,6 +146,11 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
let SimBlock { block_overrides, state_overrides, calls } = block;
// Set prevrandao to zero for simulated blocks by default,
// matching spec behavior where MixDigest is zero-initialized.
// If user provides an override, it will be applied by apply_block_overrides.
evm_env.block_env.inner_mut().prevrandao = Some(B256::ZERO);
if let Some(block_overrides) = block_overrides {
// ensure we don't allow uncapped gas limit per block
if let Some(gas_limit_override) = block_overrides.gas_limit &&
@@ -130,8 +165,8 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
evm_env.block_env.inner_mut(),
);
}
if let Some(state_overrides) = state_overrides {
apply_state_overrides(state_overrides, &mut db)
if let Some(ref state_overrides) = state_overrides {
apply_state_overrides(state_overrides.clone(), &mut db)
.map_err(Self::Error::from_eth_err)?;
}
@@ -152,7 +187,17 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
}
if txs_without_gas_limit > 0 {
(block_gas_limit - total_specified_gas) / txs_without_gas_limit as u64
// Per spec: "gasLimit: blockGasLimit - soFarUsedGasInBlock"
// Divide remaining gas equally among transactions without gas
let gas_per_tx = (block_gas_limit - total_specified_gas) /
txs_without_gas_limit as u64;
// Cap to RPC gas limit, matching spec behavior
let call_gas_limit = this.call_gas_limit();
if call_gas_limit > 0 {
gas_per_tx.min(call_gas_limit)
} else {
gas_per_tx
}
} else {
0
}
@@ -177,7 +222,16 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
let evm = this
.evm_config()
.evm_with_env_and_inspector(&mut db, evm_env, inspector);
let builder = this.evm_config().create_block_builder(evm, &parent, ctx);
let mut builder = this.evm_config().create_block_builder(evm, &parent, ctx);
if let Some(ref state_overrides) = state_overrides {
simulate::apply_precompile_overrides(
state_overrides,
builder.evm_mut().precompiles_mut(),
)
.map_err(|e| Self::Error::from_eth_err(EthApiError::other(e)))?;
}
simulate::execute_transactions(
builder,
calls,
@@ -188,7 +242,16 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
.map_err(map_err)?
} else {
let evm = this.evm_config().evm_with_env(&mut db, evm_env);
let builder = this.evm_config().create_block_builder(evm, &parent, ctx);
let mut builder = this.evm_config().create_block_builder(evm, &parent, ctx);
if let Some(ref state_overrides) = state_overrides {
simulate::apply_precompile_overrides(
state_overrides,
builder.evm_mut().precompiles_mut(),
)
.map_err(|e| Self::Error::from_eth_err(EthApiError::other(e)))?;
}
simulate::execute_transactions(
builder,
calls,
@@ -201,6 +264,10 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
parent = result.block.clone_sealed_header();
// Update tracking for next iteration's validation
prev_block_number = parent.number();
prev_timestamp = parent.timestamp();
let block = simulate::build_simulated_block::<Self::Error, _>(
result.block,
results,

View File

@@ -79,7 +79,7 @@ where
blob_schedule: chain_spec
.blob_params_at_timestamp(timestamp)
// no blob support, so we set this to original cancun values as defined in eip-4844
.unwrap_or(BlobParams::cancun()),
.unwrap_or_else(BlobParams::cancun),
chain_id: chain_spec.chain().id(),
fork_id,
precompiles,

View File

@@ -310,12 +310,14 @@ pub trait Trace: LoadState<Error: FromEvmError<Self::Evm>> + Call {
.evm_factory()
.create_tracer(&mut db, evm_env, inspector_setup())
.try_trace_many(block.transactions_recovered().take(max_transactions), |ctx| {
#[allow(clippy::needless_update)]
let tx_info = TransactionInfo {
hash: Some(*ctx.tx.tx_hash()),
index: Some(idx),
block_hash: Some(block_hash),
block_number: Some(block_number),
base_fee: Some(base_fee),
..Default::default()
};
idx += 1;

View File

@@ -290,12 +290,14 @@ pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
let block_number = block.number();
let base_fee_per_gas = block.base_fee_per_gas();
if let Some((signer, tx)) = block.transactions_with_sender().nth(index) {
#[allow(clippy::needless_update)]
let tx_info = TransactionInfo {
hash: Some(*tx.tx_hash()),
block_hash: Some(block_hash),
block_number: Some(block_number),
base_fee: base_fee_per_gas,
index: Some(index as u64),
..Default::default()
};
return Ok(Some(
@@ -366,12 +368,14 @@ pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
.enumerate()
.find(|(_, (signer, tx))| **signer == sender && (*tx).nonce() == nonce)
.map(|(index, (signer, tx))| {
#[allow(clippy::needless_update)]
let tx_info = TransactionInfo {
hash: Some(*tx.tx_hash()),
block_hash: Some(block_hash),
block_number: Some(block_number),
base_fee: base_fee_per_gas,
index: Some(index as u64),
..Default::default()
};
Ok(self.converter().fill(tx.clone().with_signer(*signer), tx_info)?)
})
@@ -619,7 +623,20 @@ pub trait LoadTransaction: SpawnBlocking + FullEthApiTypes + RpcNodeCoreExt {
Output = Result<Option<TransactionSource<ProviderTx<Self::Provider>>>, Self::Error>,
> + Send {
async move {
// Try to find the transaction on disk
// First, try the RPC cache
if let Some(cached) = self.cache().get_transaction_by_hash(hash).await &&
let Some(tx) = cached.recovered_transaction()
{
return Ok(Some(TransactionSource::Block {
transaction: tx.cloned(),
index: cached.tx_index as u64,
block_hash: cached.block.hash(),
block_number: cached.block.number(),
base_fee: cached.block.base_fee_per_gas(),
}));
}
// Cache miss - try to find the transaction on disk
if let Some((tx, meta)) = self
.spawn_blocking_io(move |this| {
this.provider()

View File

@@ -2,15 +2,68 @@
use std::sync::Arc;
use alloy_consensus::TxReceipt;
use alloy_consensus::{transaction::TxHashRef, TxReceipt};
use alloy_primitives::TxHash;
use reth_primitives_traits::{
BlockTy, IndexedTx, NodePrimitives, ReceiptTy, RecoveredBlock, SealedBlock,
Block, BlockBody, BlockTy, IndexedTx, NodePrimitives, ReceiptTy, Recovered, RecoveredBlock,
SealedBlock,
};
use reth_rpc_convert::{transaction::ConvertReceiptInput, RpcConvert, RpcTypes};
use crate::utils::calculate_gas_used_and_next_log_index;
/// Cached data for a transaction lookup.
#[derive(Debug, Clone)]
pub struct CachedTransaction<B: Block, R> {
/// The block containing this transaction.
pub block: Arc<RecoveredBlock<B>>,
/// Index of the transaction within the block.
pub tx_index: usize,
/// Receipts for the block, if available.
pub receipts: Option<Arc<Vec<R>>>,
}
impl<B: Block, R> CachedTransaction<B, R> {
/// Creates a new cached transaction entry.
pub const fn new(
block: Arc<RecoveredBlock<B>>,
tx_index: usize,
receipts: Option<Arc<Vec<R>>>,
) -> Self {
Self { block, tx_index, receipts }
}
/// Returns the `Recovered<&T>` transaction at the cached index.
pub fn recovered_transaction(&self) -> Option<Recovered<&<B::Body as BlockBody>::Transaction>> {
self.block.recovered_transaction(self.tx_index)
}
/// Converts this cached transaction into an RPC receipt using the given converter.
///
/// Returns `None` if receipts are not available or the transaction index is out of bounds.
pub fn into_receipt<N, C>(
self,
converter: &C,
) -> Option<Result<<C::Network as RpcTypes>::Receipt, C::Error>>
where
N: NodePrimitives<Block = B, Receipt = R>,
R: TxReceipt + Clone,
C: RpcConvert<Primitives = N>,
{
let receipts = self.receipts?;
let receipt = receipts.get(self.tx_index)?;
let tx_hash = *self.block.body().transactions().get(self.tx_index)?.tx_hash();
let tx = self.block.find_indexed(tx_hash)?;
convert_transaction_receipt::<N, C>(
self.block.as_ref(),
receipts.as_ref(),
tx,
receipt,
converter,
)
}
}
/// A pair of an [`Arc`] wrapped [`RecoveredBlock`] and its corresponding receipts.
///
/// This type is used throughout the RPC layer to efficiently pass around

View File

@@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
use reth_rpc_server_types::constants::cache::{
DEFAULT_BLOCK_CACHE_MAX_LEN, DEFAULT_CONCURRENT_DB_REQUESTS, DEFAULT_HEADER_CACHE_MAX_LEN,
DEFAULT_RECEIPT_CACHE_MAX_LEN,
DEFAULT_MAX_CACHED_TX_HASHES, DEFAULT_RECEIPT_CACHE_MAX_LEN,
};
/// Settings for the [`EthStateCache`](super::EthStateCache).
@@ -27,6 +27,8 @@ pub struct EthStateCacheConfig {
///
/// Default is 512.
pub max_concurrent_db_requests: usize,
/// Maximum number of transaction hashes to cache for transaction lookups.
pub max_cached_tx_hashes: u32,
}
impl Default for EthStateCacheConfig {
@@ -36,6 +38,7 @@ impl Default for EthStateCacheConfig {
max_receipts: DEFAULT_RECEIPT_CACHE_MAX_LEN,
max_headers: DEFAULT_HEADER_CACHE_MAX_LEN,
max_concurrent_db_requests: DEFAULT_CONCURRENT_DB_REQUESTS,
max_cached_tx_hashes: DEFAULT_MAX_CACHED_TX_HASHES,
}
}
}

View File

@@ -1,17 +1,18 @@
//! Async caching support for eth RPC
use super::{EthStateCacheConfig, MultiConsumerLruCache};
use alloy_consensus::BlockHeader;
use crate::block::CachedTransaction;
use alloy_consensus::{transaction::TxHashRef, BlockHeader};
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::B256;
use alloy_primitives::{TxHash, B256};
use futures::{stream::FuturesOrdered, Stream, StreamExt};
use reth_chain_state::CanonStateNotification;
use reth_errors::{ProviderError, ProviderResult};
use reth_execution_types::Chain;
use reth_primitives_traits::{Block, NodePrimitives, RecoveredBlock};
use reth_primitives_traits::{Block, BlockBody, NodePrimitives, RecoveredBlock};
use reth_storage_api::{BlockReader, TransactionVariant};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use schnellru::{ByLength, Limiter};
use schnellru::{ByLength, Limiter, LruMap};
use std::{
future::Future,
pin::Pin,
@@ -47,6 +48,9 @@ type HeaderResponseSender<H> = oneshot::Sender<ProviderResult<H>>;
/// The type that can send the response with a chain of cached blocks
type CachedParentBlocksResponseSender<B> = oneshot::Sender<Vec<Arc<RecoveredBlock<B>>>>;
/// The type that can send the response for a transaction hash lookup
type TransactionHashResponseSender<B, R> = oneshot::Sender<Option<CachedTransaction<B, R>>>;
type BlockLruCache<B, L> =
MultiConsumerLruCache<B256, Arc<RecoveredBlock<B>>, L, BlockWithSendersResponseSender<B>>;
@@ -79,11 +83,13 @@ impl<N: NodePrimitives> EthStateCache<N> {
max_receipts: u32,
max_headers: u32,
max_concurrent_db_operations: usize,
max_cached_tx_hashes: u32,
) -> (Self, EthStateCacheService<Provider, Tasks>)
where
Provider: BlockReader<Block = N::Block, Receipt = N::Receipt>,
{
let (to_service, rx) = unbounded_channel();
let service = EthStateCacheService {
provider,
full_block_cache: BlockLruCache::new(max_blocks, "blocks"),
@@ -93,6 +99,7 @@ impl<N: NodePrimitives> EthStateCache<N> {
action_rx: UnboundedReceiverStream::new(rx),
action_task_spawner,
rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_operations)),
tx_hash_index: LruMap::new(ByLength::new(max_cached_tx_hashes)),
};
let cache = Self { to_service };
(cache, service)
@@ -127,6 +134,7 @@ impl<N: NodePrimitives> EthStateCache<N> {
max_receipts,
max_headers,
max_concurrent_db_requests,
max_cached_tx_hashes,
} = config;
let (this, service) = Self::create(
provider,
@@ -135,6 +143,7 @@ impl<N: NodePrimitives> EthStateCache<N> {
max_receipts,
max_headers,
max_concurrent_db_requests,
max_cached_tx_hashes,
);
executor.spawn_critical("eth state cache", Box::pin(service));
this
@@ -255,6 +264,19 @@ impl<N: NodePrimitives> EthStateCache<N> {
Some(blocks)
}
}
/// Looks up a transaction by its hash in the cache index.
///
/// Returns the cached block, transaction index, and optionally receipts if the transaction
/// is in a cached block.
pub async fn get_transaction_by_hash(
&self,
tx_hash: TxHash,
) -> Option<CachedTransaction<N::Block, N::Receipt>> {
let (response_tx, rx) = oneshot::channel();
let _ = self.to_service.send(CacheAction::GetTransactionByHash { tx_hash, response_tx });
rx.await.ok()?
}
}
/// Thrown when the cache service task dropped.
#[derive(Debug, thiserror::Error)]
@@ -317,6 +339,8 @@ pub(crate) struct EthStateCacheService<
///
/// This restricts the max concurrent fetch tasks at the same time.
rate_limiter: Arc<Semaphore>,
/// LRU index mapping transaction hashes to their block hash and index within the block.
tx_hash_index: LruMap<TxHash, (B256, usize), ByLength>,
}
impl<Provider, Tasks> EthStateCacheService<Provider, Tasks>
@@ -324,6 +348,29 @@ where
Provider: BlockReader + Clone + Unpin + 'static,
Tasks: TaskSpawner + Clone + 'static,
{
/// Indexes all transactions in a block by transaction hash.
fn index_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
let block_hash = block.hash();
for (tx_idx, tx) in block.body().transactions().iter().enumerate() {
self.tx_hash_index.insert(*tx.tx_hash(), (block_hash, tx_idx));
}
}
/// Removes transaction index entries for a reorged block.
///
/// Only removes entries that still point to this block, preserving mappings for transactions
/// that were re-mined in a new canonical block.
fn remove_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
let block_hash = block.hash();
for tx in block.body().transactions() {
if let Some((mapped_hash, _)) = self.tx_hash_index.get(tx.tx_hash()) &&
*mapped_hash == block_hash
{
self.tx_hash_index.remove(tx.tx_hash());
}
}
}
fn on_new_block(
&mut self,
block_hash: B256,
@@ -550,6 +597,8 @@ where
}
CacheAction::CacheNewCanonicalChain { chain_change } => {
for block in chain_change.blocks {
// Index transactions before caching the block
this.index_block_transactions(&block);
this.on_new_block(block.hash(), Ok(Some(Arc::new(block))));
}
@@ -562,6 +611,8 @@ where
}
CacheAction::RemoveReorgedChain { chain_change } => {
for block in chain_change.blocks {
// Remove transaction index entries for reorged blocks
this.remove_block_transactions(&block);
this.on_reorg_block(block.hash(), Ok(Some(block)));
}
@@ -596,6 +647,15 @@ where
let _ = response_tx.send(blocks);
}
CacheAction::GetTransactionByHash { tx_hash, response_tx } => {
let result =
this.tx_hash_index.get(&tx_hash).and_then(|(block_hash, idx)| {
let block = this.full_block_cache.get(block_hash).cloned()?;
let receipts = this.receipts_cache.get(block_hash).cloned();
Some(CachedTransaction::new(block, *idx, receipts))
});
let _ = response_tx.send(result);
}
};
this.update_cached_metrics();
}
@@ -649,6 +709,11 @@ enum CacheAction<B: Block, R> {
max_blocks: usize,
response_tx: CachedParentBlocksResponseSender<B>,
},
/// Look up a transaction's cached data by its hash
GetTransactionByHash {
tx_hash: TxHash,
response_tx: TransactionHashResponseSender<B, R>,
},
}
struct BlockReceipts<R> {

View File

@@ -27,6 +27,7 @@ pub mod tx_forward;
pub mod utils;
pub use alloy_rpc_types_eth::FillTransaction;
pub use block::CachedTransaction;
pub use builder::config::{EthConfig, EthFilterConfig};
pub use cache::{
config::EthStateCacheConfig, db::StateCacheDb, multi_consumer::MultiConsumerLruCache,

View File

@@ -6,9 +6,11 @@ use crate::{
};
use alloy_consensus::{transaction::TxHashRef, BlockHeader, Transaction as _};
use alloy_eips::eip2718::WithEncoded;
use alloy_evm::precompiles::PrecompilesMap;
use alloy_network::TransactionBuilder;
use alloy_rpc_types_eth::{
simulate::{SimCallResult, SimulateError, SimulatedBlock},
state::StateOverride,
BlockTransactionsKind,
};
use jsonrpsee_types::ErrorObject;
@@ -27,6 +29,16 @@ use revm::{
Database,
};
/// Error code for execution reverted in `eth_simulateV1`.
///
/// <https://github.com/ethereum/execution-apis>
pub const SIMULATE_REVERT_CODE: i32 = -32000;
/// Error code for VM execution errors (e.g., out of gas) in `eth_simulateV1`.
///
/// <https://github.com/ethereum/execution-apis>
pub const SIMULATE_VM_ERROR_CODE: i32 = -32015;
/// Errors which may occur during `eth_simulateV1` execution.
#[derive(Debug, thiserror::Error)]
pub enum EthSimulateError {
@@ -37,11 +49,21 @@ pub enum EthSimulateError {
#[error("Client adjustable limit reached")]
GasLimitReached,
/// Block number in sequence did not increase.
#[error("Block number in sequence did not increase")]
BlockNumberInvalid,
/// Block timestamp in sequence did not increase or stay the same.
#[error("Block timestamp in sequence did not increase")]
BlockTimestampInvalid,
#[error("block numbers must be in order: {got} <= {parent}")]
BlockNumberInvalid {
/// The block number that was provided.
got: u64,
/// The parent block number.
parent: u64,
},
/// Block timestamp in sequence did not increase.
#[error("block timestamps must be in order: {got} <= {parent}")]
BlockTimestampInvalid {
/// The block timestamp that was provided.
got: u64,
/// The parent block timestamp.
parent: u64,
},
/// Transaction nonce is too low.
#[error("nonce too low: next nonce {state}, tx nonce {tx}")]
NonceTooLow {
@@ -79,6 +101,9 @@ pub enum EthSimulateError {
/// Multiple `MovePrecompileToAddress` referencing the same address.
#[error("Multiple MovePrecompileToAddress referencing the same address")]
PrecompileDuplicateAddress,
/// Attempted to move a non-precompile address.
#[error("account {0} is not a precompile")]
NotAPrecompile(Address),
}
impl EthSimulateError {
@@ -91,13 +116,14 @@ impl EthSimulateError {
Self::IntrinsicGasTooLow => -38013,
Self::InsufficientFunds { .. } => -38014,
Self::BlockGasLimitExceeded => -38015,
Self::BlockNumberInvalid => -38020,
Self::BlockTimestampInvalid => -38021,
Self::BlockNumberInvalid { .. } => -38020,
Self::BlockTimestampInvalid { .. } => -38021,
Self::PrecompileSelfReference => -38022,
Self::PrecompileDuplicateAddress => -38023,
Self::SenderNotEOA => -38024,
Self::MaxInitCodeSizeExceeded => -38025,
Self::GasLimitReached => -38026,
Self::NotAPrecompile(_) => -32000,
}
}
}
@@ -108,6 +134,76 @@ impl ToRpcError for EthSimulateError {
}
}
/// Applies precompile move overrides from state overrides to the EVM's precompiles map.
///
/// This function processes `movePrecompileToAddress` entries from the state overrides and
/// moves precompiles from their original addresses to new addresses. The original address
/// is cleared (precompile removed) and the precompile is installed at the destination address.
///
/// # Validation
///
/// - The source address must be a precompile (exists in the precompiles map)
/// - Moving multiple precompiles to the same destination is allowed
/// - Self-references (moving to the same address) are not explicitly forbidden here since that
/// would be a no-op
///
/// # Arguments
///
/// * `state_overrides` - The state overrides containing potential `movePrecompileToAddress` entries
/// * `precompiles` - Mutable reference to the EVM's precompiles map
///
/// # Returns
///
/// Returns `Ok(())` on success, or an `EthSimulateError::NotAPrecompile` if a source address
/// is not a precompile.
pub fn apply_precompile_overrides(
state_overrides: &StateOverride,
precompiles: &mut PrecompilesMap,
) -> Result<(), EthSimulateError> {
use alloy_evm::precompiles::DynPrecompile;
let moves: Vec<_> = state_overrides
.iter()
.filter_map(|(source, account_override)| {
account_override.move_precompile_to.map(|dest| (*source, dest))
})
.collect();
if moves.is_empty() {
return Ok(());
}
for (source, _) in &moves {
if precompiles.get(source).is_none() {
return Err(EthSimulateError::NotAPrecompile(*source));
}
}
let mut extracted: Vec<(Address, Address, DynPrecompile)> = Vec::with_capacity(moves.len());
for (source, dest) in moves {
if source == dest {
continue;
}
let mut found_precompile: Option<DynPrecompile> = None;
precompiles.apply_precompile(&source, |existing| {
found_precompile = existing;
None
});
if let Some(precompile) = found_precompile {
extracted.push((source, dest, precompile));
}
}
for (_, dest, precompile) in extracted {
precompiles.apply_precompile(&dest, |_| Some(precompile));
}
Ok(())
}
/// Converts all [`TransactionRequest`]s into [`Recovered`] transactions and applies them to the
/// given [`BlockExecutor`].
///
@@ -263,7 +359,7 @@ where
return_data: Bytes::new(),
error: Some(SimulateError {
message: error.to_string(),
code: error.into().code(),
code: SIMULATE_VM_ERROR_CODE,
..SimulateError::invalid_params()
}),
gas_used,
@@ -278,7 +374,7 @@ where
return_data: output,
error: Some(SimulateError {
message: error.to_string(),
code: error.into().code(),
code: SIMULATE_REVERT_CODE,
..SimulateError::invalid_params()
}),
gas_used,
@@ -299,6 +395,7 @@ where
log_index: Some(log_index - 1),
transaction_index: Some(index as u64),
transaction_hash: Some(*tx.tx_hash()),
block_hash: Some(block.hash()),
block_number: Some(block.header().number()),
block_timestamp: Some(block.header().timestamp()),
..Default::default()

View File

@@ -49,12 +49,14 @@ impl<T: SignedTransaction> TransactionSource<T> {
match self {
Self::Pool(tx) => resp_builder.fill_pending(tx),
Self::Block { transaction, index, block_hash, block_number, base_fee } => {
#[allow(clippy::needless_update)]
let tx_info = TransactionInfo {
hash: Some(transaction.trie_hash()),
index: Some(index),
block_hash: Some(block_hash),
block_number: Some(block_number),
base_fee,
..Default::default()
};
resp_builder.fill(transaction, tx_info)
@@ -69,6 +71,7 @@ impl<T: SignedTransaction> TransactionSource<T> {
let hash = tx.trie_hash();
(tx, TransactionInfo { hash: Some(hash), ..Default::default() })
}
#[allow(clippy::needless_update)]
Self::Block { transaction, index, block_hash, block_number, base_fee } => {
let hash = transaction.trie_hash();
(
@@ -79,6 +82,7 @@ impl<T: SignedTransaction> TransactionSource<T> {
block_hash: Some(block_hash),
block_number: Some(block_number),
base_fee,
..Default::default()
},
)
}

View File

@@ -132,4 +132,7 @@ pub mod cache {
/// Default number of concurrent database requests.
pub const DEFAULT_CONCURRENT_DB_REQUESTS: usize = 512;
/// Default maximum number of transaction hashes to cache for lookups.
pub const DEFAULT_MAX_CACHED_TX_HASHES: u32 = 30_000;
}

View File

@@ -99,7 +99,7 @@ impl AccountHashingStage {
// Account State generator
let mut account_cursor =
provider.tx_ref().cursor_write::<tables::PlainAccountState>()?;
accounts.sort_by(|a, b| a.0.cmp(&b.0));
accounts.sort_by_key(|a| a.0);
for (addr, acc) in &accounts {
account_cursor.append(*addr, acc)?;
}

View File

@@ -1,6 +1,8 @@
use super::collect_account_history_indices;
use crate::stages::utils::{collect_history_indices, load_account_history};
use reth_config::config::{EtlConfig, IndexHistoryConfig};
#[cfg(all(unix, feature = "rocksdb"))]
use reth_db_api::Tables;
use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut};
use reth_provider::{
DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
@@ -111,7 +113,6 @@ where
// but this is safe for first_sync because if we crash before commit, the
// checkpoint stays at 0 and we'll just clear and rebuild again on restart. The
// source data (changesets) is intact.
#[cfg(all(unix, feature = "rocksdb"))]
provider.rocksdb_provider().clear::<tables::AccountsHistory>()?;
} else {
provider.tx_ref().clear::<tables::AccountsHistory>()?;
@@ -143,6 +144,12 @@ where
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
#[cfg(all(unix, feature = "rocksdb"))]
if use_rocksdb {
provider.commit_pending_rocksdb_batches()?;
provider.rocksdb_provider().flush(&[Tables::AccountsHistory.name()])?;
}
Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true })
}

View File

@@ -1,6 +1,8 @@
use super::{collect_history_indices, collect_storage_history_indices};
use crate::{stages::utils::load_storage_history, StageCheckpoint, StageId};
use reth_config::config::{EtlConfig, IndexHistoryConfig};
#[cfg(all(unix, feature = "rocksdb"))]
use reth_db_api::Tables;
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, AddressStorageKey, BlockNumberAddress},
tables,
@@ -115,7 +117,6 @@ where
// but this is safe for first_sync because if we crash before commit, the
// checkpoint stays at 0 and we'll just clear and rebuild again on restart. The
// source data (changesets) is intact.
#[cfg(all(unix, feature = "rocksdb"))]
provider.rocksdb_provider().clear::<tables::StoragesHistory>()?;
} else {
provider.tx_ref().clear::<tables::StoragesHistory>()?;
@@ -147,6 +148,12 @@ where
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
#[cfg(all(unix, feature = "rocksdb"))]
if use_rocksdb {
provider.commit_pending_rocksdb_batches()?;
provider.rocksdb_provider().flush(&[Tables::StoragesHistory.name()])?;
}
Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true })
}

View File

@@ -10,6 +10,7 @@ use reth_prune::{
use reth_stages_api::{
ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
};
use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
use tracing::info;
/// The prune stage that runs the pruner with the provided prune modes.
@@ -46,7 +47,9 @@ where
+ StageCheckpointReader
+ StaticFileProviderFactory<
Primitives: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>,
> + StorageSettingsCache,
> + StorageSettingsCache
+ ChangeSetReader
+ StorageChangeSetReader,
{
fn id(&self) -> StageId {
StageId::Prune
@@ -151,7 +154,9 @@ where
+ StageCheckpointReader
+ StaticFileProviderFactory<
Primitives: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>,
> + StorageSettingsCache,
> + StorageSettingsCache
+ ChangeSetReader
+ StorageChangeSetReader,
{
fn id(&self) -> StageId {
StageId::PruneSenderRecovery

View File

@@ -2,6 +2,8 @@ use alloy_eips::eip2718::Encodable2718;
use alloy_primitives::{TxHash, TxNumber};
use num_traits::Zero;
use reth_config::config::{EtlConfig, TransactionLookupConfig};
#[cfg(all(unix, feature = "rocksdb"))]
use reth_db_api::Tables;
use reth_db_api::{
table::{Decode, Decompress, Value},
tables,
@@ -200,6 +202,12 @@ where
}
}
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
provider.commit_pending_rocksdb_batches()?;
provider.rocksdb_provider().flush(&[Tables::TransactionHashNumbers.name()])?;
}
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(input.target())
.with_entities_stage_checkpoint(stage_checkpoint(provider)?),

View File

@@ -11,7 +11,7 @@ use reth_db_api::{
common::KeyValue,
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
database::Database,
models::{AccountBeforeTx, StoredBlockBodyIndices},
models::{AccountBeforeTx, StorageBeforeTx, StoredBlockBodyIndices},
table::Table,
tables,
transaction::{DbTx, DbTxMut},
@@ -38,15 +38,17 @@ use tempfile::TempDir;
pub struct TestStageDB {
pub factory: ProviderFactory<MockNodeTypesWithDB>,
pub temp_static_files_dir: TempDir,
pub temp_rocksdb_dir: TempDir,
}
impl Default for TestStageDB {
/// Create a new instance of [`TestStageDB`]
fn default() -> Self {
let (static_dir, static_dir_path) = create_test_static_files_dir();
let (_, rocksdb_dir_path) = create_test_rocksdb_dir();
let (rocksdb_dir, rocksdb_dir_path) = create_test_rocksdb_dir();
Self {
temp_static_files_dir: static_dir,
temp_rocksdb_dir: rocksdb_dir,
factory: ProviderFactory::new(
create_test_rw_db(),
MAINNET.clone(),
@@ -61,10 +63,11 @@ impl Default for TestStageDB {
impl TestStageDB {
pub fn new(path: &Path) -> Self {
let (static_dir, static_dir_path) = create_test_static_files_dir();
let (_, rocksdb_dir_path) = create_test_rocksdb_dir();
let (rocksdb_dir, rocksdb_dir_path) = create_test_rocksdb_dir();
Self {
temp_static_files_dir: static_dir,
temp_rocksdb_dir: rocksdb_dir,
factory: ProviderFactory::new(
create_test_rw_db_with_path(path),
MAINNET.clone(),
@@ -473,6 +476,51 @@ impl TestStageDB {
})
}
/// Insert collection of [`ChangeSet`] into static files (account and storage changesets).
pub fn insert_changesets_to_static_files<I>(
&self,
changesets: I,
block_offset: Option<u64>,
) -> ProviderResult<()>
where
I: IntoIterator<Item = ChangeSet>,
{
let offset = block_offset.unwrap_or_default();
let static_file_provider = self.factory.static_file_provider();
let mut account_changeset_writer =
static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
let mut storage_changeset_writer =
static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
for (block, changeset) in changesets.into_iter().enumerate() {
let block_number = offset + block as u64;
let mut account_changesets = Vec::new();
let mut storage_changesets = Vec::new();
for (address, old_account, old_storage) in changeset {
account_changesets.push(AccountBeforeTx { address, info: Some(old_account) });
for entry in old_storage {
storage_changesets.push(StorageBeforeTx {
address,
key: entry.key,
value: entry.value,
});
}
}
account_changeset_writer.append_account_changeset(account_changesets, block_number)?;
storage_changeset_writer.append_storage_changeset(storage_changesets, block_number)?;
}
account_changeset_writer.commit()?;
storage_changeset_writer.commit()?;
Ok(())
}
pub fn insert_history<I>(&self, changesets: I, _block_offset: Option<u64>) -> ProviderResult<()>
where
I: IntoIterator<Item = ChangeSet>,

View File

@@ -11,7 +11,7 @@ use reth_trie_common::{HashedPostState, Nibbles, TRIE_ACCOUNT_RLP_MAX_SIZE};
use reth_trie_sparse::{
errors::SparseStateTrieResult,
provider::{DefaultTrieNodeProvider, DefaultTrieNodeProviderFactory},
SparseStateTrie, SparseTrie, SparseTrieInterface,
RevealableSparseTrie, SparseStateTrie, SparseTrie,
};
/// Trait for stateless trie implementations that can be used for stateless validation.
@@ -245,7 +245,7 @@ fn calculate_state_root(
for (address, storage) in state.storages.into_iter().sorted_unstable_by_key(|(addr, _)| *addr) {
// Take the existing storage trie (or create an empty, “revealed” one)
let mut storage_trie =
trie.take_storage_trie(&address).unwrap_or_else(SparseTrie::revealed_empty);
trie.take_storage_trie(&address).unwrap_or_else(RevealableSparseTrie::revealed_empty);
if storage.wiped {
storage_trie.wipe()?;

View File

@@ -115,6 +115,8 @@ pub enum DatabaseWriteOperation {
PutUpsert,
/// Put append.
PutAppend,
/// Flush to disk.
Flush,
}
/// Database log level.

View File

@@ -339,26 +339,31 @@ where
fn drop(&mut self) {
// To be able to abort a timed out transaction, we need to renew it first.
// Hence the usage of `txn_execute_renew_on_timeout` here.
self.txn
.txn_execute_renew_on_timeout(|txn| {
if !self.has_committed() {
if K::IS_READ_ONLY {
#[cfg(feature = "read-tx-timeouts")]
self.env.txn_manager().remove_active_read_transaction(txn);
//
// We intentionally ignore errors here because Drop should never panic.
// MDBX can return errors (e.g., MDBX_PANIC) during abort if the environment
// is in a fatal state, but panicking in Drop can cause double-panics during
// unwinding which terminates the process.
let _ = self.txn.txn_execute_renew_on_timeout(|txn| {
if !self.has_committed() {
if K::IS_READ_ONLY {
#[cfg(feature = "read-tx-timeouts")]
self.env.txn_manager().remove_active_read_transaction(txn);
unsafe {
ffi::mdbx_txn_abort(txn);
}
} else {
let (sender, rx) = sync_channel(0);
self.env
.txn_manager()
.send_message(TxnManagerMessage::Abort { tx: TxnPtr(txn), sender });
rx.recv().unwrap().unwrap();
unsafe {
ffi::mdbx_txn_abort(txn);
}
} else {
let (sender, rx) = sync_channel(0);
self.env
.txn_manager()
.send_message(TxnManagerMessage::Abort { tx: TxnPtr(txn), sender });
if let Ok(Err(e)) = rx.recv() {
tracing::error!(target: "libmdbx", %e, "failed to abort transaction in drop");
}
}
})
.unwrap();
}
});
}
}

View File

@@ -186,6 +186,11 @@ impl<N: ProviderNodeTypes> RocksDBProviderFactory for BlockchainProvider<N> {
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
unimplemented!("BlockchainProvider wraps ProviderFactory - use DatabaseProvider::set_pending_rocksdb_batch instead")
}
#[cfg(all(unix, feature = "rocksdb"))]
fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
unimplemented!("BlockchainProvider wraps ProviderFactory - use DatabaseProvider::commit_pending_rocksdb_batches instead")
}
}
impl<N: ProviderNodeTypes> HeaderProvider for BlockchainProvider<N> {

Some files were not shown because too many files have changed in this diff Show More