Compare commits

...

95 Commits

Author SHA1 Message Date
Tempo Agent
b9dbe5adea fix(db): return error when MDBX commit is aborted
MDBX's transaction commit returns MDBX_RESULT_TRUE (-1) when the
transaction was aborted due to previous errors. The Rust wrapper
converts this to Ok(true), but reth's DbTx::commit() implementation
was passing this through as a successful result.

Since commit() is typically used as `provider_rw.commit()?` throughout
the codebase, which only checks for Err variants, these silent failures
were going unnoticed - potentially leading to data loss.

This commit:
- Changes DbTx::commit() return type from Result<bool, _> to Result<(), _>
- Maps MDBX_RESULT_TRUE to a proper error in the mdbx tx implementation
- Updates all trait implementations to match the new signature

The bool return value was not useful anyway since it only indicated
whether the commit was a no-op (MDBX_RESULT_TRUE = aborted) vs real
success (MDBX_SUCCESS/false), and callers never checked it.

See: https://libmdbx.dqdkfa.ru/group__c__transactions.html#ga3146a36cf1d0603cc6e3d52e97ec38a9
Amp-Thread-ID: https://ampcode.com/threads/T-019bbe8c-b7b0-722a-8f37-7d4c19682b06
Co-authored-by: Amp <amp@ampcode.com>
2026-01-14 22:25:06 +00:00
Tempo Agent
596a95fc04 fix(storage): handle empty static files in increment_block
When the static file is empty and the expected block is beyond
expected_block_start, advance to that block first. This can happen when
enabling static files on an existing node where the first block to write
is not at the segment boundary (e.g., enabling account_changesets at
block 5,000,001 when the segment starts at 5,000,000).

This fixes UnexpectedStaticFileBlockNumber errors for account changesets
and other block-based segments.
2026-01-14 22:17:56 +00:00
Tempo Agent
5159f40452 fix(storage): use expected_block_start in ensure_at_block
When the static file is empty, ensure_at_block was hardcoding block 0
as the starting block. This fails when the writer was created for a
later block range (e.g., block 5,000,000+) because expected_block_start
would be much higher than 0.

Use expected_block_start() instead of hardcoded 0 to properly handle
empty static files in any block range.
2026-01-14 22:11:31 +00:00
Tempo Agent
7f0a6a67b1 fix(storage): skip invariants check for empty static files
When a static file is truly empty (never had data), skip the invariants
check entirely. This allows enabling static files for the first time on
an existing node without triggering false corruption detection.
2026-01-14 22:10:45 +00:00
Tempo Agent
d2a43a9288 refactor(storage): remove gap check between database and static files
Data types no longer exist simultaneously in static files and database.
Headers, transactions, etc. are only written to static files now, never
to the database. This makes the continuity gap check unnecessary.

Closes #19721
2026-01-14 22:10:11 +00:00
YK
369c629b9b perf(trie): reuse overlay in deferred trie overlay computation (#20774) 2026-01-12 15:04:26 +00:00
GarmashAlex
6fec4603cf refactor(trie): avoid building prefix set for v2 storage proofs (#20898) 2026-01-12 12:49:24 +00:00
DaniPopes
515fd597f3 perf(net): use alloy_primitives::Keccak256 (#20957) 2026-01-12 11:21:27 +00:00
Crypto Nomad
126a7c9570 perf(engine): save one clock read in sparse trie metrics (#20947) 2026-01-12 07:40:30 +00:00
Matthias Seitz
8aeee5018e perf(trie): save one clock read in elapsed time calculation (#20916) 2026-01-12 03:57:54 +00:00
Matthias Seitz
210309ca76 docs: fix typos and incorrect documentation (#20943) 2026-01-12 00:48:01 +01:00
Matthias Seitz
551918b0d8 refactor(engine): defer sparse trie setup to spawned task (#20942) 2026-01-11 23:30:14 +00:00
iPLAY888
89677e1bd9 docs(rpc): fix incorrect transport in with_ipc comment (#20939) 2026-01-11 23:04:32 +00:00
pepes
0e2b3afa3f chore: correct deprecation message for SealedBlockFor (#20929) 2026-01-11 15:08:25 +00:00
David Klank
5d551eab29 perf(payload): remove unnecessary parent_header clone (#20930) 2026-01-11 15:07:51 +00:00
David Klank
12c4c04f7d fix(optimism): add missing Holocene hardfork to DEV_HARDFORKS (#20931) 2026-01-11 15:03:35 +00:00
Matthias Seitz
392f8e6e13 refactor(engine): simplify is_done signature in MultiProofTask (#20906) 2026-01-11 09:46:20 +00:00
Crypto Nomad
1a94d1f091 docs: fix re-export source comments (#20913) 2026-01-10 15:36:03 +00:00
viktorking7
97ae89c7f0 docs: fix dead link (#20914) 2026-01-10 15:18:56 +00:00
Matthias Seitz
a4921119e4 perf(trie): defer consuming remaining storage proof receivers (#20915) 2026-01-10 15:17:20 +00:00
VolodymyrBg
0f3d3695f5 docs: document account_change_sets static files config (#20903) 2026-01-10 09:02:42 +00:00
phrwlk
54355dfc78 docs: fix Performant card link on landing page (#20904) 2026-01-10 08:54:58 +00:00
FT
44a6035fa3 fix: correct typos in error messages and logs (#20894) 2026-01-10 08:54:31 +00:00
Matthias Seitz
746baed2b1 feat(cli): add CliRunnerConfig for configurable graceful shutdown timeout (#20899)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-09 21:52:03 +00:00
Dan Cline
e86c5fba53 fix(stages): advance sender static file in sender recovery (#20897) 2026-01-09 20:23:17 +00:00
joshieDo
485fa3448d fix: call cancel_all_background_work on RocksDBProviderInner drop (#20895) 2026-01-09 19:53:31 +00:00
DaniPopes
0db3813941 fix(rbc): fail early if node exits while waiting for startup (#20892) 2026-01-09 17:58:04 +00:00
FT
52c2ae3362 docs: fix typos in documentation files (#20890)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-01-09 18:41:37 +01:00
YK
b1d75f2771 feat(bench-compare): add --wait-for-persistence flag support (#20891) 2026-01-09 16:47:46 +00:00
Matthias Seitz
ef80ee1687 chore: remove env clone (#20889) 2026-01-09 16:42:50 +00:00
radik878
8dacfb3d9c refactor(ecies): avoid duplicate keccak digest in MAC::update_body (#20854) 2026-01-09 15:35:51 +00:00
joshieDo
425a021e3b feat: add edge feature flag to reth (#20841) 2026-01-09 15:33:21 +00:00
Hwangjae Lee
08c0d30ea7 docs(reth): fix outdated comments and document missing features (#20849)
Signed-off-by: Hwangjae Lee <meetrick@gmail.com>
2026-01-09 15:32:17 +00:00
かりんとう
84e970e4c9 perf: remove redundant contains_key (#20820) 2026-01-09 15:22:06 +00:00
Fibonacci747
020f20db42 chore: correct StorageHistory prune map size constant name (#20828) 2026-01-09 15:20:02 +00:00
ANtutov
f53929e0c8 docs: clarify bodies downloader set_download_range semantics (#20821) 2026-01-09 15:18:37 +00:00
ethfanWilliam
4a8fbe15e3 chore: remove unused implementation (#20885) 2026-01-09 15:08:06 +00:00
yyhrnk
a59e9832e6 docs: document optional block param for trace_rawTransaction (#20812)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-01-09 15:04:29 +00:00
YK
07beb76cf7 feat(reth-bench-compare): add persistence-based flow optimization for reth-bench (#20869)
Co-authored-by: Alexey Shekhirin <github@shekhirin.com>
2026-01-09 14:58:21 +00:00
FT
3ddf0bd729 docs: correct typo in hive.yml workflow comment (#20884) 2026-01-09 14:50:05 +00:00
iPLAY888
c3d92ddfc2 docs(engine): update outdated EthBuiltPayload comment (#20883) 2026-01-09 14:45:11 +00:00
kurahin
c0628dfbff refactor(config): delegate PruneConfig::has_receipts_pruning (#20809) 2026-01-09 14:44:43 +00:00
Sabnock
a2aa1f18df feat(rpc): add debug_getBlockAccessList endpoint for EIP-7928 (#20824)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-01-09 13:29:37 +00:00
Arun Dhyani
d489f80f6b feat: Add TrieUpdatesSorted and HashedPostStateSorted in all ExEx notifications (#20333)
Co-authored-by: Brian Picciano <me@mediocregopher.com>
2026-01-09 13:06:41 +00:00
Emilia Hane
bf272c9432 chore(consensus): Add trait object error variant to ConsensusError (#20875)
Co-authored-by: leeli <Leeliren@proton.me>
2026-01-09 13:01:22 +00:00
FT
ebb54d0dcc docs: typo in comment (#20879) 2026-01-09 13:00:22 +00:00
Matthias Seitz
1d7367c389 perf(engine): simplify get_prefetch_proof_targets (#20864)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-09 12:53:28 +00:00
refcell
824ae12d75 feat(exex): Make WAL Block Threshold Configurable (#20867) 2026-01-09 12:45:53 +00:00
Alexey Shekhirin
2db281e51d feat(reth-bench-compare): nP latency mean change percent (#20881) 2026-01-09 11:58:08 +00:00
Brian Picciano
8367ba473e feat(metrics): Add metrics for save_block steps and computed trie input sizes (#20878) 2026-01-09 11:40:35 +00:00
fig
f2abad5f5c perf(engine): destructure leaf to avoid clone() (#20863) 2026-01-09 11:19:49 +00:00
Matthias Seitz
4673d77c03 perf(trie): optimize ChunkedHashedPostState sorting (#20866) 2026-01-09 11:18:28 +00:00
Matthias Seitz
33bcd60348 feat(rpc): add persisted block subscription (#20877)
Co-authored-by: cakevm <cakevm@proton.me>
2026-01-09 10:37:46 +00:00
Matthias Seitz
8a9b5d90f4 feat(chain-state): add persisted block tracking (#20876)
Co-authored-by: cakevm <cakevm@proton.me>
2026-01-09 09:56:20 +00:00
joshieDo
c26cfa3dcb fix: pre-calculate transitions on append_blocks_with_state (#20850) 2026-01-09 09:26:46 +00:00
joshieDo
13e59651f1 fix: initialize transaction-senders sf during genesis (#20846) 2026-01-09 09:26:26 +00:00
Hwangjae Lee
0f4995d1ea chore(trie): fix typo in comment (#20870)
Signed-off-by: Hwangjae Lee <meetrick@gmail.com>
2026-01-09 09:19:14 +00:00
Matthias Seitz
cff7e8be53 perf(engine): avoid unnecessary B256 copy in get_proof_targets (#20845)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-09 04:57:23 +00:00
YK
5433d7a4ac feat(storage): add RocksDB history lookup methods and owned batch type [2/3] (#20543) 2026-01-09 04:52:15 +00:00
fig
1866db4d50 chore(engine): remove unnecessary debug-level clone() (#20862) 2026-01-08 22:21:29 +00:00
Danno Ferrin
c9b92550b6 feat(network): add customizable announcement filtering policy to APIs (#20861)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-01-08 22:08:41 +00:00
Sebastian Stammler
8e81ebfc1f feat(optimism): Also require non-zero elasticity in payload attributes (#20858) 2026-01-08 21:32:46 +00:00
joshieDo
1363205b5d feat: allow TransactionHashNumbers to be written to rocksdb during live sync (#20853) 2026-01-08 20:02:49 +00:00
DaniPopes
ed201cae0e chore(rbc): improve compilation log message (#20855) 2026-01-08 19:30:04 +00:00
Matthias Seitz
a5b10f11ce perf(engine): handle EmptyProof inline during prefetch batching (#20848)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-08 19:12:14 +00:00
Brian Picciano
a06644944f feat(trie): Keep cached storage roots on proof workers (#20838) 2026-01-08 17:04:42 +00:00
Matthias Seitz
8eecad3d1d chore(engine): remove state update batching in multiproof (#20842)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-08 16:04:12 +00:00
Emilia Hane
412f39e223 chore(consensus): Remove associated type Consensus::Error (#20843)
Co-authored-by: Josh_dfG <126518346+JoshdfG@users.noreply.github.com>
2026-01-08 15:54:31 +00:00
Matthias Seitz
13106233e4 perf(engine): use crossbeam select for persistence events (#20813) 2026-01-08 15:47:50 +00:00
joshieDo
e63fef0e79 ci: rocksdb job to unit.yml (#20839) 2026-01-08 13:20:43 +00:00
Dan Cline
eed34254f5 feat: add StaticFileSegment::AccountChangeSets (#18882)
Co-authored-by: Alexey Shekhirin <github@shekhirin.com>
2026-01-08 12:05:05 +00:00
Emilia Hane
b38d37a1e1 feat(sdk): Add custom TrieType (#20804)
Co-authored-by: jagroot <4516586+itschaindev@users.noreply.github.com>
2026-01-08 11:53:27 +00:00
Maxim Evtush
7efaf4ca97 docs: mention optional EraStage in DefaultStages documentation (#20836) 2026-01-08 11:51:10 +00:00
Emilia Hane
ef708792a9 chore(storage): Add trait object error variant to DatabaseError (#20096) 2026-01-08 11:40:09 +00:00
Alexey Shekhirin
bcd74d021b feat(metrics): configurable jeprof pprof dumps directory (#20834) 2026-01-08 11:21:42 +00:00
bigbear
0f0a181fe2 fix(trie): account for all flag in PrefixSet::is_empty() (#20801) 2026-01-08 11:20:55 +00:00
Matthias Seitz
9678d6c76d chore: tighten iat timeout (#20835) 2026-01-08 11:09:03 +00:00
Brian Picciano
7ceca70353 feat(trie): Add flag to enable proof v2 for storage proof workers (#20617)
Co-authored-by: YK <chiayongkang@hotmail.com>
2026-01-08 10:53:24 +00:00
Matthias Seitz
4412a501eb perf(chain-state): avoid clones in deferred trie computation (#20816) 2026-01-08 09:25:32 +00:00
YK
3ca5cf49b6 refactor(storage): extract shared find_changeset_block_from_index algorithm [1/3] (#20542) 2026-01-08 02:56:38 +00:00
Matthias Seitz
1d4603769f perf(trie): use sorted_unstable for proof target chunking (#20827) 2026-01-08 01:05:14 +00:00
Lorsmirq Benton
9bba8c7a98 docs(net): complete incomplete doc comment (#20793) 2026-01-07 21:16:00 +00:00
Alexey Shekhirin
6f0ef914b9 feat(metrics): jemalloc heap dump endpoint (#20811) 2026-01-07 19:36:08 +00:00
Alexey Shekhirin
d756e8310a chore(engine): more logs when cache is not available (#20817)
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
2026-01-07 19:35:27 +00:00
DaniPopes
74a7ba581c feat(rbc): don't wait in between FCUs when warming up (#20818) 2026-01-07 19:20:33 +00:00
Matthias Seitz
a8980bf7c1 chore: ignore RUSTSEC-2026-0002 (#20819) 2026-01-07 18:47:09 +00:00
Matthias Seitz
050d9f440f chore: ignore RUSTSEC-2025-0141 bincode advisory (#20815) 2026-01-07 19:10:30 +01:00
Brian Picciano
df33a8200f feat(reth-bench-compare): Do unwind first (#20808) 2026-01-07 16:49:07 +00:00
Matthias Seitz
d3dab613fc revert: "perf(engine): parellelize multiproof_targets_from_state (#206… (#20807) 2026-01-07 15:49:10 +00:00
Matthias Seitz
1b31739adf revert: "perf(engine): paralellize evm_state_to_hashed_post_state() (#… (#20806) 2026-01-07 15:47:15 +00:00
DaniPopes
6280abedd0 chore(reth-bench-compare): skip last unwind (#20805) 2026-01-07 15:44:36 +00:00
Mohan Somnath
4c064a4d20 docs: fix article and grammar errors in comments (#20794) 2026-01-07 15:00:13 +00:00
phrwlk
8d19a36492 docs: clarify pending pending_block build_block docs (#20800) 2026-01-07 14:09:54 +00:00
cui
78f2685ee9 perf: remove unnecessary code (#20719)
Co-authored-by: weixie.cui <weixie.cui@okg.com>
2026-01-07 12:12:17 +00:00
YK
fee7e997ff refactor(trie): replace TrieMasks with Option<BranchNodeMasks> (#20707) 2026-01-07 11:27:23 +00:00
280 changed files with 9233 additions and 3132 deletions

View File

@@ -28,10 +28,14 @@ jobs:
build:
- name: 'Build and push the nightly reth image'
command: 'make PROFILE=maxperf docker-build-push-nightly'
- name: 'Build and push the nightly edge profiling reth image'
command: 'make PROFILE=profiling docker-build-push-nightly-edge-profiling'
- name: 'Build and push the nightly profiling reth image'
command: 'make PROFILE=profiling docker-build-push-nightly-profiling'
- name: 'Build and push the nightly op-reth image'
command: 'make IMAGE_NAME=$OP_IMAGE_NAME DOCKER_IMAGE_NAME=$OP_DOCKER_IMAGE_NAME PROFILE=maxperf op-docker-build-push-nightly'
- name: 'Build and push the nightly edge profiling op-reth image'
command: 'make IMAGE_NAME=$OP_IMAGE_NAME DOCKER_IMAGE_NAME=$OP_DOCKER_IMAGE_NAME PROFILE=profiling op-docker-build-push-nightly-edge-profiling'
- name: 'Build and push the nightly profiling op-reth image'
command: 'make IMAGE_NAME=$OP_IMAGE_NAME DOCKER_IMAGE_NAME=$OP_DOCKER_IMAGE_NAME PROFILE=profiling op-docker-build-push-nightly-profiling'
steps:

View File

@@ -15,11 +15,21 @@ concurrency:
cancel-in-progress: true
jobs:
prepare-reth:
prepare-reth-stable:
uses: ./.github/workflows/prepare-reth.yml
with:
image_tag: ghcr.io/paradigmxyz/reth:latest
binary_name: reth
cargo_features: "asm-keccak"
artifact_name: "reth-stable"
prepare-reth-edge:
uses: ./.github/workflows/prepare-reth.yml
with:
image_tag: ghcr.io/paradigmxyz/reth:latest
binary_name: reth
cargo_features: "asm-keccak edge"
artifact_name: "reth-edge"
prepare-hive:
if: github.repository == 'paradigmxyz/reth'
@@ -77,6 +87,7 @@ jobs:
strategy:
fail-fast: false
matrix:
storage: [stable, edge]
# ethereum/rpc to be deprecated:
# https://github.com/ethereum/hive/pull/1117
scenario:
@@ -86,7 +97,7 @@ jobs:
- sim: devp2p
limit: discv4
# started failing after https://github.com/ethereum/go-ethereum/pull/31843, no
# action on our side, remove from here when we get unxpected passes on these tests
# action on our side, remove from here when we get unexpected passes on these tests
# - sim: devp2p
# limit: eth
# include:
@@ -176,9 +187,10 @@ jobs:
- sim: ethereum/eels/consume-rlp
limit: .*tests/paris.*
needs:
- prepare-reth
- prepare-reth-stable
- prepare-reth-edge
- prepare-hive
name: run ${{ matrix.scenario.sim }}${{ matrix.scenario.limit && format(' - {0}', matrix.scenario.limit) }}
name: ${{ matrix.storage }} / ${{ matrix.scenario.sim }}${{ matrix.scenario.limit && format(' - {0}', matrix.scenario.limit) }}
runs-on:
group: Reth
permissions:
@@ -197,7 +209,7 @@ jobs:
- name: Download reth image
uses: actions/download-artifact@v7
with:
name: artifacts
name: reth-${{ matrix.storage }}
path: /tmp
- name: Load Docker images

View File

@@ -21,6 +21,11 @@ on:
required: false
type: string
description: "Optional cargo package path"
artifact_name:
required: false
type: string
default: "artifacts"
description: "Name for the uploaded artifact"
jobs:
prepare-reth:
@@ -52,5 +57,5 @@ jobs:
id: upload
uses: actions/upload-artifact@v6
with:
name: artifacts
name: ${{ inputs.artifact_name }}
path: ./artifacts

View File

@@ -19,29 +19,22 @@ concurrency:
jobs:
test:
name: test / ${{ matrix.type }} (${{ matrix.partition }}/${{ matrix.total_partitions }})
name: test / ${{ matrix.type }} / ${{ matrix.storage }}
runs-on: depot-ubuntu-latest-4
env:
RUST_BACKTRACE: 1
EDGE_FEATURES: ${{ matrix.storage == 'edge' && 'edge' || '' }}
strategy:
matrix:
type: [ethereum, optimism]
storage: [stable, edge]
include:
- type: ethereum
args: --features "asm-keccak ethereum" --locked
partition: 1
total_partitions: 2
- type: ethereum
args: --features "asm-keccak ethereum" --locked
partition: 2
total_partitions: 2
features: asm-keccak ethereum
exclude_args: ""
- type: optimism
args: --features "asm-keccak" --locked --exclude reth --exclude reth-bench --exclude "example-*" --exclude "reth-ethereum-*" --exclude "*-ethereum"
partition: 1
total_partitions: 2
- type: optimism
args: --features "asm-keccak" --locked --exclude reth --exclude reth-bench --exclude "example-*" --exclude "reth-ethereum-*" --exclude "*-ethereum"
partition: 2
total_partitions: 2
features: asm-keccak
exclude_args: --exclude reth --exclude reth-bench --exclude "example-*" --exclude "reth-ethereum-*" --exclude "*-ethereum"
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
@@ -59,9 +52,9 @@ jobs:
- name: Run tests
run: |
cargo nextest run \
${{ matrix.args }} --workspace \
--features "${{ matrix.features }} $EDGE_FEATURES" --locked \
${{ matrix.exclude_args }} --workspace \
--exclude ef-tests --no-tests=warn \
--partition hash:${{ matrix.partition }}/2 \
-E "!kind(test) and not binary(e2e_testsuite)"
state:

View File

@@ -18,7 +18,7 @@ Reth is a high-performance Ethereum execution client written in Rust, focusing o
6. **Pipeline (`crates/stages/`)**: Staged sync architecture for blockchain synchronization
7. **Trie (`crates/trie/`)**: Merkle Patricia Trie implementation with parallel state root computation
8. **Node Builder (`crates/node/`)**: High-level node orchestration and configuration
9 **The Consensus Engine (`crates/engine/`)**: Handles processing blocks received from the consensus layer with the Engine API (newPayload, forkchoiceUpdated)
9. **The Consensus Engine (`crates/engine/`)**: Handles processing blocks received from the consensus layer with the Engine API (newPayload, forkchoiceUpdated)
### Key Design Principles

View File

@@ -51,9 +51,7 @@ elsewhere.
<!-- - **Asking in the support Telegram:** The [Foundry Support Telegram][support-tg] is a fast and easy way to ask questions. -->
<!-- - **Opening a discussion:** This repository comes with a discussions board where you can also ask for help. Click the "Discussions" tab at the top. -->
If you have reviewed existing documentation and still have questions, or you are having problems, you can get help by *
*opening a discussion**. This repository comes with a discussions board where you can also ask for help. Click the "
Discussions" tab at the top.
If you have reviewed existing documentation and still have questions, or you are having problems, you can get help by **opening a discussion**. This repository comes with a discussions board where you can also ask for help. Click the "Discussions" tab at the top.
As Reth is still in heavy development, the documentation can be a bit scattered. The [Reth Docs][reth-docs] is our
current best-effort attempt at keeping up-to-date information.

98
Cargo.lock generated
View File

@@ -255,7 +255,6 @@ checksum = "926b2c0d34e641cf8b17bf54ce50fda16715b9f68ad878fa6128bae410c6f890"
dependencies = [
"alloy-primitives",
"alloy-rlp",
"borsh",
"serde",
]
@@ -458,7 +457,6 @@ dependencies = [
"proptest-derive 0.6.0",
"rand 0.9.2",
"rapidhash",
"rayon",
"ruint",
"rustc-hash",
"serde",
@@ -4565,7 +4563,6 @@ dependencies = [
"allocator-api2",
"equivalent",
"foldhash 0.2.0",
"rayon",
"serde",
"serde_core",
]
@@ -5069,7 +5066,6 @@ dependencies = [
"arbitrary",
"equivalent",
"hashbrown 0.16.1",
"rayon",
"serde",
"serde_core",
]
@@ -5255,6 +5251,23 @@ version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2"
[[package]]
name = "jemalloc_pprof"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74ff642505c7ce8d31c0d43ec0e235c6fd4585d9b8172d8f9dd04d36590200b5"
dependencies = [
"anyhow",
"libc",
"mappings",
"once_cell",
"pprof_util",
"tempfile",
"tikv-jemalloc-ctl",
"tokio",
"tracing",
]
[[package]]
name = "jni"
version = "0.21.1"
@@ -5785,6 +5798,19 @@ dependencies = [
"syn 2.0.113",
]
[[package]]
name = "mappings"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db4d277bb50d4508057e7bddd7fcd19ef4a4cc38051b6a5a36868d75ae2cbeb9"
dependencies = [
"anyhow",
"libc",
"once_cell",
"pprof_util",
"tracing",
]
[[package]]
name = "match-lookup"
version = "0.1.1"
@@ -6525,7 +6551,7 @@ dependencies = [
"opentelemetry-http",
"opentelemetry-proto",
"opentelemetry_sdk",
"prost",
"prost 0.14.1",
"reqwest",
"thiserror 2.0.17",
"tokio",
@@ -6541,7 +6567,7 @@ checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f"
dependencies = [
"opentelemetry",
"opentelemetry_sdk",
"prost",
"prost 0.14.1",
"tonic",
"tonic-prost",
]
@@ -6882,6 +6908,20 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
[[package]]
name = "pprof_util"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4429d44e5e2c8a69399fc0070379201eed018e3df61e04eb7432811df073c224"
dependencies = [
"anyhow",
"backtrace",
"flate2",
"num",
"paste",
"prost 0.13.5",
]
[[package]]
name = "ppv-lite86"
version = "0.2.21"
@@ -7068,6 +7108,16 @@ dependencies = [
"syn 2.0.113",
]
[[package]]
name = "prost"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5"
dependencies = [
"bytes",
"prost-derive 0.13.5",
]
[[package]]
name = "prost"
version = "0.14.1"
@@ -7075,7 +7125,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d"
dependencies = [
"bytes",
"prost-derive",
"prost-derive 0.14.1",
]
[[package]]
name = "prost-derive"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
dependencies = [
"anyhow",
"itertools 0.14.0",
"proc-macro2",
"quote",
"syn 2.0.113",
]
[[package]]
@@ -7625,6 +7688,7 @@ version = "1.9.3"
dependencies = [
"alloy-eips",
"alloy-json-rpc",
"alloy-network",
"alloy-primitives",
"alloy-provider",
"alloy-pubsub",
@@ -7646,6 +7710,7 @@ dependencies = [
"reqwest",
"reth-cli-runner",
"reth-cli-util",
"reth-engine-primitives",
"reth-fs-util",
"reth-node-api",
"reth-node-core",
@@ -7657,6 +7722,7 @@ dependencies = [
"tokio",
"tower",
"tracing",
"url",
]
[[package]]
@@ -7665,7 +7731,9 @@ version = "1.9.3"
dependencies = [
"alloy-primitives",
"alloy-provider",
"alloy-rpc-client",
"alloy-rpc-types-eth",
"alloy-transport-ws",
"chrono",
"clap",
"csv",
@@ -8270,7 +8338,6 @@ dependencies = [
"reth-network-peers",
"secp256k1 0.30.0",
"sha2",
"sha3",
"thiserror 2.0.17",
"tokio",
"tokio-stream",
@@ -8410,7 +8477,6 @@ dependencies = [
"reth-stages",
"reth-stages-api",
"reth-static-file",
"reth-storage-errors",
"reth-tasks",
"reth-testing-utils",
"reth-tracing",
@@ -8885,6 +8951,7 @@ dependencies = [
"reth-tasks",
"reth-testing-utils",
"reth-tracing",
"reth-trie-common",
"rmp-serde",
"secp256k1 0.30.0",
"tempfile",
@@ -9481,18 +9548,25 @@ dependencies = [
name = "reth-node-metrics"
version = "1.9.3"
dependencies = [
"bytes",
"eyre",
"http",
"http-body-util",
"jemalloc_pprof",
"jsonrpsee-server",
"mappings",
"metrics",
"metrics-exporter-prometheus",
"metrics-process",
"metrics-util",
"pprof_util",
"procfs 0.17.0",
"reqwest",
"reth-fs-util",
"reth-metrics",
"reth-tasks",
"socket2 0.5.10",
"tempfile",
"tikv-jemalloc-ctl",
"tokio",
"tower",
@@ -10280,6 +10354,7 @@ version = "1.9.3"
dependencies = [
"alloy-consensus",
"alloy-dyn-abi",
"alloy-eip7928",
"alloy-eips",
"alloy-evm",
"alloy-genesis",
@@ -10362,6 +10437,7 @@ dependencies = [
name = "reth-rpc-api"
version = "1.9.3"
dependencies = [
"alloy-eip7928",
"alloy-eips",
"alloy-genesis",
"alloy-json-rpc",
@@ -11109,6 +11185,8 @@ dependencies = [
"reth-execution-errors",
"reth-primitives-traits",
"reth-provider",
"reth-storage-api",
"reth-storage-errors",
"reth-trie",
"reth-trie-common",
"revm",
@@ -13028,7 +13106,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67"
dependencies = [
"bytes",
"prost",
"prost 0.14.1",
"tonic",
]

View File

@@ -487,7 +487,7 @@ revm-inspectors = "0.33.2"
alloy-chains = { version = "0.2.5", default-features = false }
alloy-dyn-abi = "1.4.1"
alloy-eip2124 = { version = "0.2.0", default-features = false }
alloy-eip7928 = { version = "0.1.0" }
alloy-eip7928 = { version = "0.1.0", default-features = false }
alloy-evm = { version = "0.25.1", default-features = false }
alloy-primitives = { version = "1.5.0", default-features = false, features = ["map-foldhash"] }
alloy-rlp = { version = "0.3.10", default-features = false, features = ["core-net"] }
@@ -684,6 +684,7 @@ ethereum_ssz = "0.9.0"
ethereum_ssz_derive = "0.9.0"
# allocators
jemalloc_pprof = { version = "0.8", default-features = false }
tikv-jemalloc-ctl = "0.6"
tikv-jemallocator = "0.6"
tracy-client = "0.18.0"

View File

@@ -276,6 +276,11 @@ docker-build-push-latest: ## Build and push a cross-arch Docker image tagged wit
docker-build-push-nightly: ## Build and push cross-arch Docker image tagged with the latest git tag with a `-nightly` suffix, and `latest-nightly`.
$(call docker_build_push,nightly,nightly)
.PHONY: docker-build-push-nightly-edge-profiling
docker-build-push-nightly-edge-profiling: FEATURES := $(FEATURES) edge
docker-build-push-nightly-edge-profiling: ## Build and push cross-arch Docker image with edge features tagged with `nightly-edge-profiling`.
$(call docker_build_push,nightly-edge-profiling,nightly-edge-profiling)
# Create a cross-arch Docker image with the given tags and push it
define docker_build_push
$(MAKE) build-x86_64-unknown-linux-gnu
@@ -328,6 +333,11 @@ op-docker-build-push-latest: ## Build and push a cross-arch Docker image tagged
op-docker-build-push-nightly: ## Build and push cross-arch Docker image tagged with the latest git tag with a `-nightly` suffix, and `latest-nightly`.
$(call op_docker_build_push,nightly,nightly)
.PHONY: op-docker-build-push-nightly-edge-profiling
op-docker-build-push-nightly-edge-profiling: FEATURES := $(FEATURES) edge
op-docker-build-push-nightly-edge-profiling: ## Build and push cross-arch Docker image with edge features tagged with `nightly-edge-profiling`.
$(call op_docker_build_push,nightly-edge-profiling,nightly-edge-profiling)
# Note: This requires a buildx builder with emulation support. For example:
#
# `docker run --privileged --rm tonistiigi/binfmt --install amd64,arm64`

View File

@@ -44,7 +44,7 @@ More historical context below:
- We released 1.0 "production-ready" stable Reth in June 2024.
- Reth completed an audit with [Sigma Prime](https://sigmaprime.io/), the developers of [Lighthouse](https://github.com/sigp/lighthouse), the Rust Consensus Layer implementation. Find it [here](./audit/sigma_prime_audit_v2.pdf).
- Revm (the EVM used in Reth) underwent an audit with [Guido Vranken](https://x.com/guidovranken) (#1 [Ethereum Bug Bounty](https://ethereum.org/en/bug-bounty)). We will publish the results soon.
- We released multiple iterative beta versions, up to [beta.9](https://github.com/paradigmxyz/reth/releases/tag/v0.2.0-beta.9) on Monday June 3, 2024,the last beta release.
- We released multiple iterative beta versions, up to [beta.9](https://github.com/paradigmxyz/reth/releases/tag/v0.2.0-beta.9) on Monday June 3, 2024, the last beta release.
- We released [beta](https://github.com/paradigmxyz/reth/releases/tag/v0.2.0-beta.1) on Monday March 4, 2024, our first breaking change to the database model, providing faster query speed, smaller database footprint, and allowing "history" to be mounted on separate drives.
- We shipped iterative improvements until the last alpha release on February 28, 2024, [0.1.0-alpha.21](https://github.com/paradigmxyz/reth/releases/tag/v0.1.0-alpha.21).
- We [initially announced](https://www.paradigm.xyz/2023/06/reth-alpha) [0.1.0-alpha.1](https://github.com/paradigmxyz/reth/releases/tag/v0.1.0-alpha.1) on June 20, 2023.

View File

@@ -25,7 +25,9 @@ reth-chainspec.workspace = true
# alloy
alloy-provider = { workspace = true, features = ["reqwest-rustls-tls"], default-features = false }
alloy-rpc-client = { workspace = true, features = ["pubsub"] }
alloy-rpc-types-eth.workspace = true
alloy-transport-ws.workspace = true
alloy-primitives.workspace = true
# CLI and argument parsing

View File

@@ -0,0 +1,50 @@
# reth-bench-compare
Compare reth performance between two git references.
## Usage
```bash
reth-bench-compare \
--baseline-ref main \
--feature-ref my-feature \
--blocks 100 \
--wait-for-persistence
```
## Arguments
| Argument | Description | Default | Required |
|----------|-------------|---------|----------|
| `--baseline-ref <REF>` | Git reference for baseline | - | Yes |
| `--feature-ref <REF>` | Git reference to compare | - | Yes |
| `--blocks <N>` | Number of blocks to benchmark | `100` | No |
| `--chain <CHAIN>` | Chain to benchmark | `mainnet` | No |
| `--datadir <PATH>` | Data directory path | OS-specific | No |
| `--rpc-url <URL>` | RPC endpoint for block data | Chain default | No |
| `--output-dir <PATH>` | Output directory | `./reth-bench-compare` | No |
| `--wait-for-persistence` | Wait for block persistence | `false` | No |
| `--persistence-threshold <N>` | Wait after every N+1 blocks | `2` | No |
| `--wait-time <DURATION>` | Fixed delay (legacy) | - | No |
| `--warmup-blocks <N>` | Cache warmup blocks | Same as `--blocks` | No |
| `--draw` | Generate charts (needs Python/uv) | `false` | No |
| `--profile` | Enable CPU profiling (needs samply) | `false` | No |
| `-vvvv` | Debug logging | Info | No |
| `--features <FEATURES>` | Rust features for both builds | `jemalloc,asm-keccak` | No |
| `--rustflags <FLAGS>` | RUSTFLAGS for both builds | `-C target-cpu=native` | No |
| `--baseline-features <FEATURES>` | Features for baseline only | Inherits `--features` | No |
| `--feature-features <FEATURES>` | Features for feature only | Inherits `--features` | No |
| `--baseline-rustflags <FLAGS>` | RUSTFLAGS for baseline only | Inherits `--rustflags` | No |
| `--feature-rustflags <FLAGS>` | RUSTFLAGS for feature only | Inherits `--rustflags` | No |
| `--baseline-args <ARGS>` | Extra args for baseline node | - | No |
| `--feature-args <ARGS>` | Extra args for feature node | - | No |
| `--metrics-port <PORT>` | Metrics endpoint port | `5005` | No |
| `--sudo` | Run with elevated privileges | `false` | No |
## Output
Results in `./reth-bench-compare/results/<timestamp>/`:
- `comparison_report.json` - Metrics comparison
- `per_block_comparison.csv` - Per-block statistics
- `baseline/` and `feature/` - Individual run results
- `latency_comparison.png` - Chart (if `--draw` used)

View File

@@ -18,6 +18,8 @@ pub(crate) struct BenchmarkRunner {
rpc_url: String,
jwt_secret: String,
wait_time: Option<String>,
wait_for_persistence: bool,
persistence_threshold: Option<u64>,
warmup_blocks: u64,
}
@@ -28,6 +30,8 @@ impl BenchmarkRunner {
rpc_url: args.get_rpc_url(),
jwt_secret: args.jwt_secret_path().to_string_lossy().to_string(),
wait_time: args.wait_time.clone(),
wait_for_persistence: args.wait_for_persistence,
persistence_threshold: args.persistence_threshold,
warmup_blocks: args.get_warmup_blocks(),
}
}
@@ -96,13 +100,9 @@ impl BenchmarkRunner {
&from_block.to_string(),
"--to",
&to_block.to_string(),
"--wait-time=0ms", // Warmup should avoid persistence waits.
]);
// Add wait-time argument if provided
if let Some(ref wait_time) = self.wait_time {
cmd.args(["--wait-time", wait_time]);
}
cmd.env("RUST_LOG_STYLE", "never")
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
@@ -186,9 +186,16 @@ impl BenchmarkRunner {
&output_dir.to_string_lossy(),
]);
// Add wait-time argument if provided
// Configure wait mode: wait-time takes precedence over persistence-based flow
if let Some(ref wait_time) = self.wait_time {
cmd.args(["--wait-time", wait_time]);
} else if self.wait_for_persistence {
cmd.arg("--wait-for-persistence");
// Add persistence threshold if specified
if let Some(threshold) = self.persistence_threshold {
cmd.args(["--persistence-threshold", &threshold.to_string()]);
}
}
cmd.env("RUST_LOG_STYLE", "never")

View File

@@ -114,10 +114,29 @@ pub(crate) struct Args {
#[arg(long)]
pub profile: bool,
/// Wait time between engine API calls (passed to reth-bench)
#[arg(long, value_name = "DURATION")]
/// Optional fixed delay between engine API calls (passed to reth-bench).
///
/// When set, reth-bench uses wait-time mode and disables persistence-based flow.
/// This flag remains for compatibility with older scripts.
#[arg(long, value_name = "DURATION", hide = true)]
pub wait_time: Option<String>,
/// Wait for blocks to be persisted before sending the next batch (passed to reth-bench).
///
/// When enabled, waits for every Nth block to be persisted using the
/// `reth_subscribePersistedBlock` subscription. This ensures the benchmark
/// doesn't outpace persistence.
#[arg(long)]
pub wait_for_persistence: bool,
/// Engine persistence threshold (passed to reth-bench).
///
/// The benchmark waits after every `(threshold + 1)` blocks. By default this
/// matches the engine's default persistence threshold (2), so waits occur
/// at blocks 3, 6, 9, etc.
#[arg(long, value_name = "PERSISTENCE_THRESHOLD")]
pub persistence_threshold: Option<u64>,
/// Number of blocks to run for cache warmup after clearing caches.
/// If not specified, defaults to the same as --blocks
#[arg(long, value_name = "N")]
@@ -512,6 +531,7 @@ async fn run_compilation_phase(
Ok((baseline_commit, feature_commit))
}
#[allow(clippy::too_many_arguments)]
/// Run warmup phase to warm up caches before benchmarking
async fn run_warmup_phase(
git_manager: &GitManager,
@@ -521,9 +541,15 @@ async fn run_warmup_phase(
args: &Args,
is_optimism: bool,
baseline_commit: &str,
starting_tip: u64,
) -> Result<()> {
info!("=== Running warmup phase ===");
// Unwind to starting block minus warmup blocks, so we end up back at starting_tip
let warmup_blocks = args.get_warmup_blocks();
let unwind_target = starting_tip.saturating_sub(warmup_blocks);
node_manager.unwind_to_block(unwind_target).await?;
// Use baseline for warmup
let warmup_ref = &args.baseline_ref;
@@ -552,12 +578,9 @@ async fn run_warmup_phase(
node_manager.start_node(&binary_path, warmup_ref, "warmup", &additional_args).await?;
// Wait for node to be ready and get its current tip
let current_tip = node_manager.wait_for_node_ready_and_get_tip().await?;
let current_tip = node_manager.wait_for_node_ready_and_get_tip(&mut node_process).await?;
info!("Warmup node is ready at tip: {}", current_tip);
// Store the tip we'll unwind back to
let original_tip = current_tip;
// Clear filesystem caches before warmup run only (unless disabled)
if args.no_clear_cache {
info!("Skipping filesystem cache clearing (--no-clear-cache flag set)");
@@ -568,12 +591,9 @@ async fn run_warmup_phase(
// Run warmup to warm up caches
benchmark_runner.run_warmup(current_tip).await?;
// Stop node before unwinding (node must be stopped to release database lock)
// Stop node after warmup
node_manager.stop_node(&mut node_process).await?;
// Unwind back to starting block after warmup
node_manager.unwind_to_block(original_tip).await?;
info!("Warmup phase completed");
Ok(())
}
@@ -595,6 +615,27 @@ async fn run_benchmark_workflow(
let (baseline_commit, feature_commit) =
run_compilation_phase(git_manager, compilation_manager, args, is_optimism).await?;
// Switch to baseline reference and get the starting tip
git_manager.switch_ref(&args.baseline_ref)?;
let binary_path =
compilation_manager.get_cached_binary_path_for_commit(&baseline_commit, is_optimism);
if !binary_path.exists() {
return Err(eyre!(
"Cached baseline binary not found at {:?}. Compilation phase should have created it.",
binary_path
));
}
// Start node briefly to get the current tip, then stop it
info!("=== Determining initial block height ===");
let additional_args = args.build_additional_args("baseline", args.baseline_args.as_ref());
let (mut node_process, _) = node_manager
.start_node(&binary_path, &args.baseline_ref, "baseline", &additional_args)
.await?;
let starting_tip = node_manager.wait_for_node_ready_and_get_tip(&mut node_process).await?;
info!("Node starting tip: {}", starting_tip);
node_manager.stop_node(&mut node_process).await?;
// Run warmup phase before benchmarking (skip if warmup_blocks is 0)
if args.get_warmup_blocks() > 0 {
run_warmup_phase(
@@ -605,6 +646,7 @@ async fn run_benchmark_workflow(
args,
is_optimism,
&baseline_commit,
starting_tip,
)
.await?;
} else {
@@ -620,6 +662,10 @@ async fn run_benchmark_workflow(
let commit = commits[i];
info!("=== Processing {} reference: {} ===", ref_type, git_ref);
// Unwind to starting block minus benchmark blocks, so we end up back at starting_tip
let unwind_target = starting_tip.saturating_sub(args.blocks);
node_manager.unwind_to_block(unwind_target).await?;
// Switch to target reference
git_manager.switch_ref(git_ref)?;
@@ -653,17 +699,14 @@ async fn run_benchmark_workflow(
node_manager.start_node(&binary_path, git_ref, ref_type, &additional_args).await?;
// Wait for node to be ready and get its current tip (wherever it is)
let current_tip = node_manager.wait_for_node_ready_and_get_tip().await?;
let current_tip = node_manager.wait_for_node_ready_and_get_tip(&mut node_process).await?;
info!("Node is ready at tip: {}", current_tip);
// Store the tip we'll unwind back to
let original_tip = current_tip;
// Calculate benchmark range
// Note: reth-bench has an off-by-one error where it consumes the first block
// of the range, so we add 1 to compensate and get exactly args.blocks blocks
let from_block = original_tip;
let to_block = original_tip + args.blocks;
let from_block = current_tip;
let to_block = current_tip + args.blocks;
// Run benchmark
let output_dir = comparison_generator.get_ref_output_dir(ref_type);
@@ -680,9 +723,6 @@ async fn run_benchmark_workflow(
// Stop node
node_manager.stop_node(&mut node_process).await?;
// Unwind back to original tip
node_manager.unwind_to_block(original_tip).await?;
// Store results for comparison
comparison_generator.add_ref_results(ref_type, &output_dir)?;

View File

@@ -99,6 +99,7 @@ pub(crate) struct RefInfo {
/// Summary of the comparison between references.
///
/// Percent deltas are `(feature - baseline) / baseline * 100`:
/// - `new_payload_latency_mean_change_percent`: percent changes of the per-block means.
/// - `new_payload_latency_p50_change_percent` / p90 / p99: percent changes of the respective
/// per-block percentiles.
/// - `per_block_latency_change_mean_percent` / `per_block_latency_change_median_percent` are the
@@ -116,6 +117,7 @@ pub(crate) struct ComparisonSummary {
pub per_block_latency_change_median_percent: f64,
pub per_block_latency_change_std_dev_percent: f64,
pub new_payload_total_latency_change_percent: f64,
pub new_payload_latency_mean_change_percent: f64,
pub new_payload_latency_p50_change_percent: f64,
pub new_payload_latency_p90_change_percent: f64,
pub new_payload_latency_p99_change_percent: f64,
@@ -445,6 +447,10 @@ impl ComparisonGenerator {
per_block_latency_change_median_percent,
per_block_latency_change_std_dev_percent,
new_payload_total_latency_change_percent,
new_payload_latency_mean_change_percent: calc_percent_change(
baseline.mean_new_payload_latency_ms,
feature.mean_new_payload_latency_ms,
),
new_payload_latency_p50_change_percent: calc_percent_change(
baseline.median_new_payload_latency_ms,
feature.median_new_payload_latency_ms,
@@ -575,6 +581,10 @@ impl ComparisonGenerator {
" Total newPayload time change: {:+.2}%",
summary.new_payload_total_latency_change_percent
);
println!(
" NewPayload Latency mean: {:+.2}%",
summary.new_payload_latency_mean_change_percent
);
println!(
" NewPayload Latency p50: {:+.2}%",
summary.new_payload_latency_p50_change_percent

View File

@@ -121,8 +121,7 @@ impl CompilationManager {
cmd.env("RUSTFLAGS", rustflags);
info!("Using RUSTFLAGS: {rustflags}");
// Debug log the command
debug!("Executing cargo command: {:?}", cmd);
info!("Compiling {binary_name} with {cmd:?}");
let output = cmd.output().wrap_err("Failed to execute cargo build command")?;
@@ -231,8 +230,7 @@ impl CompilationManager {
let mut cmd = Command::new("cargo");
cmd.args(["install", "--locked", "samply"]);
// Debug log the command
debug!("Executing cargo command: {:?}", cmd);
info!("Installing samply with {cmd:?}");
let output = cmd.output().wrap_err("Failed to execute cargo install samply command")?;
@@ -307,8 +305,7 @@ impl CompilationManager {
let mut cmd = Command::new("make");
cmd.arg("install-reth-bench").current_dir(&self.repo_root);
// Debug log the command
debug!("Executing make command: {:?}", cmd);
info!("Compiling reth-bench with {cmd:?}");
let output = cmd.output().wrap_err("Failed to execute make install-reth-bench command")?;

View File

@@ -2,7 +2,9 @@
use crate::cli::Args;
use alloy_provider::{Provider, ProviderBuilder};
use alloy_rpc_client::RpcClient;
use alloy_rpc_types_eth::SyncStatus;
use alloy_transport_ws::WsConnect;
use eyre::{eyre, OptionExt, Result, WrapErr};
#[cfg(unix)]
use nix::sys::signal::{killpg, Signal};
@@ -18,6 +20,9 @@ use tokio::{
};
use tracing::{debug, info, warn};
/// Default websocket RPC port used by reth
const DEFAULT_WS_RPC_PORT: u16 = 8546;
/// Manages reth node lifecycle and operations
pub(crate) struct NodeManager {
datadir: Option<String>,
@@ -152,7 +157,10 @@ impl NodeManager {
metrics_arg,
"--http".to_string(),
"--http.api".to_string(),
"eth".to_string(),
"eth,reth".to_string(),
"--ws".to_string(),
"--ws.api".to_string(),
"eth,reth".to_string(),
"--disable-discovery".to_string(),
"--trusted-only".to_string(),
]);
@@ -359,8 +367,13 @@ impl NodeManager {
Ok((child, reth_command))
}
/// Wait for the node to be ready and return its current tip
pub(crate) async fn wait_for_node_ready_and_get_tip(&self) -> Result<u64> {
/// Wait for the node to be ready and return its current tip.
///
/// Fails early if the node process exits before becoming ready.
pub(crate) async fn wait_for_node_ready_and_get_tip(
&self,
child: &mut tokio::process::Child,
) -> Result<u64> {
info!("Waiting for node to be ready and synced...");
let max_wait = Duration::from_secs(120); // 2 minutes to allow for sync
@@ -371,8 +384,23 @@ impl NodeManager {
let url = rpc_url.parse().map_err(|e| eyre!("Invalid RPC URL '{}': {}", rpc_url, e))?;
let provider = ProviderBuilder::new().connect_http(url);
let start_time = tokio::time::Instant::now();
let mut iteration = 0;
timeout(max_wait, async {
loop {
iteration += 1;
debug!(
"Readiness check iteration {} (elapsed: {:?})",
iteration,
start_time.elapsed()
);
// Check if the node process has exited.
if let Some(status) = child.try_wait()? {
return Err(eyre!("Node process exited unexpectedly with {status}"));
}
// First check if RPC is up and node is not syncing
match provider.syncing().await {
Ok(sync_result) => {
@@ -381,24 +409,48 @@ impl NodeManager {
debug!("Node is still syncing {sync_info:?}, waiting...");
}
_ => {
debug!("HTTP RPC is up and node is not syncing, checking block number...");
// Node is not syncing, now get the tip
match provider.get_block_number().await {
Ok(tip) => {
info!("Node is ready and not syncing at block: {}", tip);
return Ok(tip);
debug!("HTTP RPC ready at block: {}, checking WebSocket...", tip);
// Verify WebSocket RPC is ready (public endpoint, no JWT required)
let ws_url = format!("ws://localhost:{}", DEFAULT_WS_RPC_PORT);
debug!("Attempting WebSocket connection to {} (public endpoint)", ws_url);
let ws_connect = WsConnect::new(&ws_url);
match RpcClient::connect_pubsub(ws_connect).await
{
Ok(_) => {
info!(
"Node is ready (HTTP and WebSocket) at block: {} (took {:?}, {} iterations)",
tip, start_time.elapsed(), iteration
);
return Ok(tip);
}
Err(e) => {
debug!(
"HTTP RPC ready but WebSocket not ready yet (iteration {}): {:?}",
iteration, e
);
debug!("WebSocket error details: {}", e);
}
}
}
Err(e) => {
debug!("Failed to get block number: {}", e);
debug!("Failed to get block number (iteration {}): {:?}", iteration, e);
}
}
}
}
}
Err(e) => {
debug!("Node RPC not ready yet or failed to check sync status: {}", e);
debug!("Node RPC not ready yet or failed to check sync status (iteration {}): {:?}", iteration, e);
}
}
debug!("Sleeping for {:?} before next check", check_interval);
sleep(check_interval).await;
}
})

View File

@@ -16,6 +16,7 @@ workspace = true
# reth
reth-cli-runner.workspace = true
reth-cli-util.workspace = true
reth-engine-primitives.workspace = true
reth-fs-util.workspace = true
reth-node-api.workspace = true
reth-node-core.workspace = true
@@ -25,10 +26,11 @@ reth-tracing.workspace = true
# alloy
alloy-eips.workspace = true
alloy-json-rpc.workspace = true
alloy-network.workspace = true
alloy-primitives.workspace = true
alloy-provider = { workspace = true, features = ["engine-api", "reqwest-rustls-tls"], default-features = false }
alloy-provider = { workspace = true, features = ["engine-api", "pubsub", "reqwest-rustls-tls"], default-features = false }
alloy-pubsub.workspace = true
alloy-rpc-client.workspace = true
alloy-rpc-client = { workspace = true, features = ["pubsub"] }
alloy-rpc-types-engine.workspace = true
alloy-transport-http.workspace = true
alloy-transport-ipc.workspace = true
@@ -50,6 +52,9 @@ tracing.workspace = true
serde.workspace = true
serde_json.workspace = true
# url parsing
url.workspace = true
# async
async-trait.workspace = true
futures.workspace = true

View File

@@ -31,6 +31,14 @@ Otherwise, running `make maxperf` at the root of the repo should be sufficient f
`reth-bench` contains different commands to benchmark different patterns of engine API calls.
The `reth-bench new-payload-fcu` command is the most representative of ethereum mainnet live sync, alternating between sending `engine_newPayload` calls and `engine_forkchoiceUpdated` calls.
The `new-payload-fcu` command supports two optional waiting modes that can be used together or independently:
- `--wait-time <duration>`: Fixed sleep interval between blocks (e.g., `--wait-time 100ms`)
- `--wait-for-persistence`: Waits for blocks to be persisted using the `reth_subscribePersistedBlock` subscription
When using `--wait-for-persistence`, the benchmark waits after every `(threshold + 1)` blocks, where the threshold defaults to the engine's persistence threshold (2). This can be customized with `--persistence-threshold <N>`.
By default, the WebSocket URL for persistence subscriptions is derived from `--engine-rpc-url` (converting to ws:// on port 8546). Use `--ws-rpc-url` to override this.
Below is an overview of how to run a benchmark:
### Setup

View File

@@ -163,7 +163,7 @@ impl AuthenticatedTransport {
// shift the iat forward by one second so there is some buffer time
let mut shifted_claims = inner_and_claims.1;
shifted_claims.iat -= 1;
shifted_claims.iat -= 30;
// if the claims are out of date, reset the inner transport
if !shifted_claims.is_within_time_window() {

View File

@@ -1,5 +1,13 @@
//! Runs the `reth bench` command, calling first newPayload for each block, then calling
//! forkchoiceUpdated.
//!
//! Supports configurable waiting behavior:
//! - **`--wait-time`**: Fixed sleep interval between blocks.
//! - **`--wait-for-persistence`**: Waits for every Nth block to be persisted using the
//! `reth_subscribePersistedBlock` subscription, where N matches the engine's persistence
//! threshold. This ensures the benchmark doesn't outpace persistence.
//!
//! Both options can be used together or independently.
use crate::{
bench::{
@@ -11,16 +19,26 @@ use crate::{
},
valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload},
};
use alloy_provider::Provider;
use alloy_eips::BlockNumHash;
use alloy_network::Ethereum;
use alloy_provider::{Provider, RootProvider};
use alloy_pubsub::SubscriptionStream;
use alloy_rpc_client::RpcClient;
use alloy_rpc_types_engine::ForkchoiceState;
use alloy_transport_ws::WsConnect;
use clap::Parser;
use csv::Writer;
use eyre::{Context, OptionExt};
use futures::StreamExt;
use humantime::parse_duration;
use reth_cli_runner::CliContext;
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
use reth_node_core::args::BenchmarkArgs;
use std::time::{Duration, Instant};
use tracing::{debug, info};
use url::Url;
const PERSISTENCE_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(60);
/// `reth benchmark new-payload-fcu` command
#[derive(Debug, Parser)]
@@ -30,8 +48,31 @@ pub struct Command {
rpc_url: String,
/// How long to wait after a forkchoice update before sending the next payload.
#[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, default_value = "250ms", verbatim_doc_comment)]
wait_time: Duration,
#[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
wait_time: Option<Duration>,
/// Wait for blocks to be persisted before sending the next batch.
///
/// When enabled, waits for every Nth block to be persisted using the
/// `reth_subscribePersistedBlock` subscription. This ensures the benchmark
/// doesn't outpace persistence.
///
/// The subscription uses the regular RPC websocket endpoint (no JWT required).
#[arg(long, default_value = "false", verbatim_doc_comment)]
wait_for_persistence: bool,
/// Engine persistence threshold used for deciding when to wait for persistence.
///
/// The benchmark waits after every `(threshold + 1)` blocks. By default this
/// matches the engine's `DEFAULT_PERSISTENCE_THRESHOLD` (2), so waits occur
/// at blocks 3, 6, 9, etc.
#[arg(
long = "persistence-threshold",
value_name = "PERSISTENCE_THRESHOLD",
default_value_t = DEFAULT_PERSISTENCE_THRESHOLD,
verbatim_doc_comment
)]
persistence_threshold: u64,
/// The size of the block buffer (channel capacity) for prefetching blocks from the RPC
/// endpoint.
@@ -50,6 +91,32 @@ pub struct Command {
impl Command {
/// Execute `benchmark new-payload-fcu` command
pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
// Log mode configuration
if let Some(duration) = self.wait_time {
info!("Using wait-time mode with {}ms delay between blocks", duration.as_millis());
}
if self.wait_for_persistence {
info!(
"Persistence waiting enabled (waits after every {} blocks to match engine gap > {} behavior)",
self.persistence_threshold + 1,
self.persistence_threshold
);
}
// Set up waiter based on configured options (duration takes precedence)
let mut waiter = match (self.wait_time, self.wait_for_persistence) {
(Some(duration), _) => Some(PersistenceWaiter::with_duration(duration)),
(None, true) => {
let sub = self.setup_persistence_subscription().await?;
Some(PersistenceWaiter::with_subscription(
sub,
self.persistence_threshold,
PERSISTENCE_CHECKPOINT_TIMEOUT,
))
}
(None, false) => None,
};
let BenchContext {
benchmark_mode,
block_provider,
@@ -110,7 +177,6 @@ impl Command {
}
});
// put results in a summary vec so they can be printed at the end
let mut results = Vec::new();
let total_benchmark_duration = Instant::now();
let mut total_wait_time = Duration::ZERO;
@@ -121,14 +187,12 @@ impl Command {
total_wait_time += wait_start.elapsed();
result
} {
// just put gas used here
let gas_used = block.header.gas_used;
let block_number = block.header.number;
let transaction_count = block.transactions.len() as u64;
debug!(target: "reth-bench", ?block_number, "Sending payload",);
debug!(target: "reth-bench", ?block_number, "Sending payload");
// construct fcu to call
let forkchoice_state = ForkchoiceState {
head_block_hash: head,
safe_block_hash: safe,
@@ -143,7 +207,6 @@ impl Command {
call_forkchoice_updated(&auth_provider, version, forkchoice_state, None).await?;
// calculate the total duration and the fcu latency, record
let total_latency = start.elapsed();
let fcu_latency = total_latency - new_payload_result.latency;
let combined_result = CombinedResult {
@@ -154,17 +217,15 @@ impl Command {
total_latency,
};
// current duration since the start of the benchmark minus the time
// waiting for blocks
// Exclude time spent waiting on the block prefetch channel from the benchmark duration.
// We want to measure engine throughput, not RPC fetch latency.
let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
// convert gas used to gigagas, then compute gigagas per second
info!(%combined_result);
// wait before sending the next payload
tokio::time::sleep(self.wait_time).await;
if let Some(w) = &mut waiter {
w.on_block(block_number).await?;
}
// record the current result
let gas_row =
TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
results.push((gas_row, combined_result));
@@ -175,24 +236,26 @@ impl Command {
return Err(error);
}
// Drop waiter - we don't need to wait for final blocks to persist
// since the benchmark goal is measuring Ggas/s of newPayload/FCU, not persistence.
drop(waiter);
let (gas_output_results, combined_results): (_, Vec<CombinedResult>) =
results.into_iter().unzip();
// write the csv output to files
if let Some(path) = self.benchmark.output {
// first write the combined results to a file
// Write CSV output files
if let Some(ref path) = self.benchmark.output {
let output_path = path.join(COMBINED_OUTPUT_SUFFIX);
info!("Writing engine api call latency output to file: {:?}", output_path);
let mut writer = Writer::from_path(output_path)?;
let mut writer = Writer::from_path(&output_path)?;
for result in combined_results {
writer.serialize(result)?;
}
writer.flush()?;
// now write the gas output to a file
let output_path = path.join(GAS_OUTPUT_SUFFIX);
info!("Writing total gas output to file: {:?}", output_path);
let mut writer = Writer::from_path(output_path)?;
let mut writer = Writer::from_path(&output_path)?;
for row in &gas_output_results {
writer.serialize(row)?;
}
@@ -201,8 +264,8 @@ impl Command {
info!("Finished writing benchmark output files to {:?}.", path);
}
// accumulate the results and calculate the overall Ggas/s
let gas_output = TotalGasOutput::new(gas_output_results)?;
info!(
total_duration=?gas_output.total_duration,
total_gas_used=?gas_output.total_gas_used,
@@ -213,4 +276,278 @@ impl Command {
Ok(())
}
/// Returns the websocket RPC URL used for the persistence subscription.
///
/// Preference:
/// - If `--ws-rpc-url` is provided, use it directly.
/// - Otherwise, derive a WS RPC URL from `--engine-rpc-url`.
///
/// The persistence subscription endpoint (`reth_subscribePersistedBlock`) is exposed on
/// the regular RPC server (WS port, usually 8546), not on the engine API port (usually 8551).
/// Since `BenchmarkArgs` only has the engine URL by default, we convert the scheme
/// (http→ws, https→wss) and force the port to 8546.
fn derive_ws_rpc_url(&self) -> eyre::Result<Url> {
if let Some(ref ws_url) = self.benchmark.ws_rpc_url {
let parsed: Url = ws_url
.parse()
.wrap_err_with(|| format!("Failed to parse WebSocket RPC URL: {ws_url}"))?;
info!(target: "reth-bench", ws_url = %parsed, "Using provided WebSocket RPC URL");
Ok(parsed)
} else {
let derived = engine_url_to_ws_url(&self.benchmark.engine_rpc_url)?;
debug!(
target: "reth-bench",
engine_url = %self.benchmark.engine_rpc_url,
%derived,
"Derived WebSocket RPC URL from engine RPC URL"
);
Ok(derived)
}
}
/// Establishes a websocket connection and subscribes to `reth_subscribePersistedBlock`.
async fn setup_persistence_subscription(&self) -> eyre::Result<PersistenceSubscription> {
let ws_url = self.derive_ws_rpc_url()?;
info!("Connecting to WebSocket at {} for persistence subscription", ws_url);
let ws_connect = WsConnect::new(ws_url.to_string());
let client = RpcClient::connect_pubsub(ws_connect)
.await
.wrap_err("Failed to connect to WebSocket RPC endpoint")?;
let provider: RootProvider<Ethereum> = RootProvider::new(client);
let subscription = provider
.subscribe_to::<BlockNumHash>("reth_subscribePersistedBlock")
.await
.wrap_err("Failed to subscribe to persistence notifications")?;
info!("Subscribed to persistence notifications");
Ok(PersistenceSubscription::new(provider, subscription.into_stream()))
}
}
/// Converts an engine API URL to the default RPC websocket URL.
///
/// Transformations:
/// - `http` → `ws`
/// - `https` → `wss`
/// - `ws` / `wss` keep their scheme
/// - Port is always set to `8546`, reth's default RPC websocket port.
///
/// This is used when we only know the engine API URL (typically `:8551`) but
/// need to connect to the node's WS RPC endpoint for persistence events.
fn engine_url_to_ws_url(engine_url: &str) -> eyre::Result<Url> {
let url: Url = engine_url
.parse()
.wrap_err_with(|| format!("Failed to parse engine RPC URL: {engine_url}"))?;
let mut ws_url = url.clone();
match ws_url.scheme() {
"http" => ws_url
.set_scheme("ws")
.map_err(|_| eyre::eyre!("Failed to set WS scheme for URL: {url}"))?,
"https" => ws_url
.set_scheme("wss")
.map_err(|_| eyre::eyre!("Failed to set WSS scheme for URL: {url}"))?,
"ws" | "wss" => {}
scheme => {
return Err(eyre::eyre!(
"Unsupported URL scheme '{scheme}' for URL: {url}. Expected http, https, ws, or wss."
))
}
}
ws_url.set_port(Some(8546)).map_err(|_| eyre::eyre!("Failed to set port for URL: {url}"))?;
Ok(ws_url)
}
/// Waits until the persistence subscription reports that `target` has been persisted.
///
/// Consumes subscription events until `last_persisted >= target`, or returns an error if:
/// - the subscription stream ends unexpectedly, or
/// - `timeout` elapses before `target` is observed.
async fn wait_for_persistence(
stream: &mut SubscriptionStream<BlockNumHash>,
target: u64,
last_persisted: &mut u64,
timeout: Duration,
) -> eyre::Result<()> {
tokio::time::timeout(timeout, async {
while *last_persisted < target {
match stream.next().await {
Some(persisted) => {
*last_persisted = persisted.number;
debug!(
target: "reth-bench",
persisted_block = ?last_persisted,
"Received persistence notification"
);
}
None => {
return Err(eyre::eyre!("Persistence subscription closed unexpectedly"));
}
}
}
Ok(())
})
.await
.map_err(|_| {
eyre::eyre!(
"Persistence timeout: target block {} not persisted within {:?}. Last persisted: {}",
target,
timeout,
last_persisted
)
})?
}
/// Wrapper that keeps both the subscription stream and the underlying provider alive.
/// The provider must be kept alive for the subscription to continue receiving events.
struct PersistenceSubscription {
_provider: RootProvider<Ethereum>,
stream: SubscriptionStream<BlockNumHash>,
}
impl PersistenceSubscription {
const fn new(
provider: RootProvider<Ethereum>,
stream: SubscriptionStream<BlockNumHash>,
) -> Self {
Self { _provider: provider, stream }
}
const fn stream_mut(&mut self) -> &mut SubscriptionStream<BlockNumHash> {
&mut self.stream
}
}
/// Encapsulates the block waiting logic.
///
/// Provides a simple `on_block()` interface that handles both:
/// - Fixed duration waits (when `wait_time` is set)
/// - Persistence-based waits (when `subscription` is set)
///
/// For persistence mode, waits after every `(threshold + 1)` blocks.
struct PersistenceWaiter {
wait_time: Option<Duration>,
subscription: Option<PersistenceSubscription>,
blocks_sent: u64,
last_persisted: u64,
threshold: u64,
timeout: Duration,
}
impl PersistenceWaiter {
const fn with_duration(wait_time: Duration) -> Self {
Self {
wait_time: Some(wait_time),
subscription: None,
blocks_sent: 0,
last_persisted: 0,
threshold: 0,
timeout: Duration::ZERO,
}
}
const fn with_subscription(
subscription: PersistenceSubscription,
threshold: u64,
timeout: Duration,
) -> Self {
Self {
wait_time: None,
subscription: Some(subscription),
blocks_sent: 0,
last_persisted: 0,
threshold,
timeout,
}
}
/// Called once per block. Waits based on the configured mode.
#[allow(clippy::manual_is_multiple_of)]
async fn on_block(&mut self, block_number: u64) -> eyre::Result<()> {
if let Some(wait_time) = self.wait_time {
tokio::time::sleep(wait_time).await;
return Ok(());
}
let Some(ref mut subscription) = self.subscription else {
return Ok(());
};
self.blocks_sent += 1;
if self.blocks_sent % (self.threshold + 1) == 0 {
debug!(
target: "reth-bench",
target_block = ?block_number,
last_persisted = self.last_persisted,
blocks_sent = self.blocks_sent,
"Waiting for persistence"
);
wait_for_persistence(
subscription.stream_mut(),
block_number,
&mut self.last_persisted,
self.timeout,
)
.await?;
debug!(
target: "reth-bench",
persisted = self.last_persisted,
"Persistence caught up"
);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_engine_url_to_ws_url() {
// http -> ws, always uses port 8546
let result = engine_url_to_ws_url("http://localhost:8551").unwrap();
assert_eq!(result.as_str(), "ws://localhost:8546/");
// https -> wss
let result = engine_url_to_ws_url("https://localhost:8551").unwrap();
assert_eq!(result.as_str(), "wss://localhost:8546/");
// Custom engine port still maps to 8546
let result = engine_url_to_ws_url("http://localhost:9551").unwrap();
assert_eq!(result.port(), Some(8546));
// Already ws passthrough
let result = engine_url_to_ws_url("ws://localhost:8546").unwrap();
assert_eq!(result.scheme(), "ws");
// Invalid inputs
assert!(engine_url_to_ws_url("ftp://localhost:8551").is_err());
assert!(engine_url_to_ws_url("not a valid url").is_err());
}
#[tokio::test]
async fn test_waiter_with_duration() {
let mut waiter = PersistenceWaiter::with_duration(Duration::from_millis(1));
let start = Instant::now();
waiter.on_block(1).await.unwrap();
waiter.on_block(2).await.unwrap();
waiter.on_block(3).await.unwrap();
// Should have waited ~3ms total
assert!(start.elapsed() >= Duration::from_millis(3));
}
}

View File

@@ -116,6 +116,11 @@ jemalloc-prof = [
"reth-cli-util/jemalloc",
"reth-cli-util/jemalloc-prof",
"reth-ethereum-cli/jemalloc-prof",
"reth-node-metrics/jemalloc-prof",
]
jemalloc-symbols = [
"jemalloc-prof",
"reth-ethereum-cli/jemalloc-symbols",
]
jemalloc-unprefixed = [
"reth-cli-util/jemalloc-unprefixed",
@@ -166,6 +171,8 @@ min-trace-logs = [
"reth-node-core/min-trace-logs",
]
edge = ["reth-ethereum-cli/edge"]
[[bin]]
name = "reth"
path = "src/main.rs"

View File

@@ -2,22 +2,46 @@
//!
//! ## Feature Flags
//!
//! ### Default Features
//!
//! - `jemalloc`: Uses [jemallocator](https://github.com/tikv/jemallocator) as the global allocator.
//! This is **not recommended on Windows**. See [here](https://rust-lang.github.io/rfcs/1974-global-allocators.html#jemalloc)
//! for more info.
//! - `otlp`: Enables [OpenTelemetry](https://opentelemetry.io/) metrics export to a configured OTLP
//! collector endpoint.
//! - `js-tracer`: Enables the `JavaScript` tracer for the `debug_trace` endpoints, allowing custom
//! `JavaScript`-based transaction tracing.
//! - `keccak-cache-global`: Enables global caching for Keccak256 hashes to improve performance.
//! - `asm-keccak`: Replaces the default, pure-Rust implementation of Keccak256 with one implemented
//! in assembly; see [the `keccak-asm` crate](https://github.com/DaniPopes/keccak-asm) for more
//! details and supported targets.
//!
//! ### Allocator Features
//!
//! - `jemalloc-prof`: Enables [jemallocator's](https://github.com/tikv/jemallocator) heap profiling
//! and leak detection functionality. See [jemalloc's opt.prof](https://jemalloc.net/jemalloc.3.html#opt.prof)
//! documentation for usage details. This is **not recommended on Windows**. See [here](https://rust-lang.github.io/rfcs/1974-global-allocators.html#jemalloc)
//! for more info.
//! - `asm-keccak`: replaces the default, pure-Rust implementation of Keccak256 with one implemented
//! in assembly; see [the `keccak-asm` crate](https://github.com/DaniPopes/keccak-asm) for more
//! details and supported targets
//! documentation for usage details. This is **not recommended on Windows**.
//! - `jemalloc-symbols`: Enables jemalloc symbols for profiling. Includes `jemalloc-prof`.
//! - `jemalloc-unprefixed`: Uses unprefixed jemalloc symbols.
//! - `tracy-allocator`: Enables [Tracy](https://github.com/wolfpld/tracy) profiler allocator
//! integration for memory profiling.
//! - `snmalloc`: Uses [snmalloc](https://github.com/snmalloc/snmalloc) as the global allocator. Use
//! `--no-default-features` when enabling this, as jemalloc takes precedence.
//! - `snmalloc-native`: Uses snmalloc with native CPU optimizations. Use `--no-default-features`
//! when enabling this.
//!
//! ### Log Level Features
//!
//! - `min-error-logs`: Disables all logs below `error` level.
//! - `min-warn-logs`: Disables all logs below `warn` level.
//! - `min-info-logs`: Disables all logs below `info` level. This can speed up the node, since fewer
//! calls to the logging component are made.
//! - `min-debug-logs`: Disables all logs below `debug` level.
//! - `min-trace-logs`: Disables all logs below `trace` level.
//!
//! ### Development Features
//!
//! - `dev`: Enables development mode features, including test vector generation commands.
#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
@@ -170,7 +194,7 @@ pub mod rpc {
pub use reth_rpc::eth::*;
}
/// Re-exported from `reth_rpc::rpc`.
/// Re-exported from `reth_rpc_server_types::result`.
pub mod result {
pub use reth_rpc_server_types::result::*;
}

View File

@@ -3,6 +3,10 @@
#[global_allocator]
static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::new_allocator();
#[cfg(all(feature = "jemalloc-prof", unix))]
#[unsafe(export_name = "_rjem_malloc_conf")]
static MALLOC_CONF: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0";
use clap::Parser;
use reth::{args::RessArgs, cli::Cli, ress::install_ress_subprotocol};
use reth_ethereum_cli::chainspec::EthereumChainSpecParser;

View File

@@ -33,6 +33,7 @@ where
) -> Self {
let (finalized_block, _) = watch::channel(finalized);
let (safe_block, _) = watch::channel(safe);
let (persisted_block, _) = watch::channel(None);
Self {
inner: Arc::new(ChainInfoInner {
@@ -42,6 +43,7 @@ where
canonical_head: RwLock::new(head),
safe_block,
finalized_block,
persisted_block,
}),
}
}
@@ -97,6 +99,11 @@ where
self.inner.finalized_block.borrow().as_ref().map(SealedHeader::num_hash)
}
/// Returns the `BlockNumHash` of the persisted block.
pub fn get_persisted_num_hash(&self) -> Option<BlockNumHash> {
*self.inner.persisted_block.borrow()
}
/// Sets the canonical head of the chain.
pub fn set_canonical_head(&self, header: SealedHeader<N::BlockHeader>) {
let number = header.number();
@@ -130,6 +137,18 @@ where
});
}
/// Sets the persisted block of the chain.
pub fn set_persisted(&self, num_hash: BlockNumHash) {
self.inner.persisted_block.send_if_modified(|current| {
if current.map(|b| b.hash) != Some(num_hash.hash) {
let _ = current.replace(num_hash);
return true
}
false
});
}
/// Subscribe to the finalized block.
pub fn subscribe_finalized_block(
&self,
@@ -141,6 +160,11 @@ where
pub fn subscribe_safe_block(&self) -> watch::Receiver<Option<SealedHeader<N::BlockHeader>>> {
self.inner.safe_block.subscribe()
}
/// Subscribe to the persisted block.
pub fn subscribe_persisted_block(&self) -> watch::Receiver<Option<BlockNumHash>> {
self.inner.persisted_block.subscribe()
}
}
/// Container type for all chain info fields
@@ -159,11 +183,14 @@ struct ChainInfoInner<N: NodePrimitives = reth_ethereum_primitives::EthPrimitive
safe_block: watch::Sender<Option<SealedHeader<N::BlockHeader>>>,
/// The block that the beacon node considers finalized.
finalized_block: watch::Sender<Option<SealedHeader<N::BlockHeader>>>,
/// The last block that was persisted to disk.
persisted_block: watch::Sender<Option<BlockNumHash>>,
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_primitives::B256;
use reth_ethereum_primitives::EthPrimitives;
use reth_testing_utils::{generators, generators::random_header};
@@ -338,4 +365,28 @@ mod tests {
// Assert that the BlockNumHash returned matches the safe header
assert_eq!(tracker.get_safe_num_hash(), Some(safe_header.num_hash()));
}
#[test]
fn test_set_persisted() {
let mut rng = generators::rng();
let header = random_header(&mut rng, 10, None);
let tracker: ChainInfoTracker<EthPrimitives> = ChainInfoTracker::new(header, None, None);
// Initial state: persisted block should be None
assert!(tracker.get_persisted_num_hash().is_none());
// Set a persisted block
let num_hash1 = BlockNumHash::new(10, B256::random());
tracker.set_persisted(num_hash1);
assert_eq!(tracker.get_persisted_num_hash(), Some(num_hash1));
// Setting the same block again should not change anything
tracker.set_persisted(num_hash1);
assert_eq!(tracker.get_persisted_num_hash(), Some(num_hash1));
// Set a different block
let num_hash2 = BlockNumHash::new(20, B256::random());
tracker.set_persisted(num_hash2);
assert_eq!(tracker.get_persisted_num_hash(), Some(num_hash2));
}
}

View File

@@ -37,12 +37,19 @@ pub struct ComputedTrieData {
/// Trie input bundled with its anchor hash.
///
/// This is used to store the trie input and anchor hash for a block together.
/// The `trie_input` contains the **cumulative** overlay of all in-memory ancestor blocks,
/// not just this block's changes. Child blocks reuse the parent's overlay in O(1) by
/// cloning the Arc-wrapped data.
///
/// The `anchor_hash` is metadata indicating which persisted base state this overlay
/// sits on top of. It is CRITICAL for overlay reuse decisions: an overlay built on top
/// of Anchor A cannot be reused for a block anchored to Anchor B, as it would result
/// in an incorrect state.
#[derive(Clone, Debug)]
pub struct AnchoredTrieInput {
/// The persisted ancestor hash this trie input is anchored to.
pub anchor_hash: B256,
/// Trie input constructed from in-memory overlays.
/// Cumulative trie input overlay from all in-memory ancestors.
pub trie_input: Arc<TrieInputSorted>,
}
@@ -62,7 +69,8 @@ static DEFERRED_TRIE_METRICS: LazyLock<DeferredTrieMetrics> =
/// Internal state for deferred trie data.
enum DeferredState {
/// Data is not yet available; raw inputs stored for fallback computation.
Pending(PendingInputs),
/// Wrapped in `Option` to allow taking ownership during computation.
Pending(Option<PendingInputs>),
/// Data has been computed and is ready.
Ready(ComputedTrieData),
}
@@ -112,12 +120,12 @@ impl DeferredTrieData {
ancestors: Vec<Self>,
) -> Self {
Self {
state: Arc::new(Mutex::new(DeferredState::Pending(PendingInputs {
state: Arc::new(Mutex::new(DeferredState::Pending(Some(PendingInputs {
hashed_state,
trie_updates,
anchor_hash,
ancestors,
}))),
})))),
}
}
@@ -138,8 +146,9 @@ impl DeferredTrieData {
///
/// # Process
/// 1. Sort the current block's hashed state and trie updates
/// 2. Merge ancestor overlays (oldest -> newest, so later state takes precedence)
/// 3. Extend the merged overlay with this block's sorted data
/// 2. Reuse parent's cached overlay if available (O(1) - the common case)
/// 3. Otherwise, rebuild overlay from ancestors (rare fallback)
/// 4. Extend the overlay with this block's sorted data
///
/// Used by both the async background task and the synchronous fallback path.
///
@@ -147,49 +156,103 @@ impl DeferredTrieData {
/// * `hashed_state` - Unsorted hashed post-state (account/storage changes) from execution
/// * `trie_updates` - Unsorted trie node updates from state root computation
/// * `anchor_hash` - The persisted ancestor hash this trie input is anchored to
/// * `ancestors` - Deferred trie data from ancestor blocks for merging
/// * `ancestors` - Deferred trie data from ancestor blocks for merging (oldest -> newest)
pub fn sort_and_build_trie_input(
hashed_state: &HashedPostState,
trie_updates: &TrieUpdates,
hashed_state: Arc<HashedPostState>,
trie_updates: Arc<TrieUpdates>,
anchor_hash: B256,
ancestors: &[Self],
) -> ComputedTrieData {
// Sort the current block's hashed state and trie updates
let sorted_hashed_state = Arc::new(hashed_state.clone_into_sorted());
let sorted_trie_updates = Arc::new(trie_updates.clone_into_sorted());
let sorted_hashed_state = match Arc::try_unwrap(hashed_state) {
Ok(state) => state.into_sorted(),
Err(arc) => arc.clone_into_sorted(),
};
let sorted_trie_updates = match Arc::try_unwrap(trie_updates) {
Ok(updates) => updates.into_sorted(),
Err(arc) => arc.clone_into_sorted(),
};
// Merge trie data from ancestors (oldest -> newest so later state takes precedence)
let mut overlay = TrieInputSorted::default();
for ancestor in ancestors {
let ancestor_data = ancestor.wait_cloned();
{
let state_mut = Arc::make_mut(&mut overlay.state);
state_mut.extend_ref(ancestor_data.hashed_state.as_ref());
}
{
let nodes_mut = Arc::make_mut(&mut overlay.nodes);
nodes_mut.extend_ref(ancestor_data.trie_updates.as_ref());
}
}
// Reuse parent's overlay if available and anchors match.
// We can only reuse the parent's overlay if it was built on top of the same
// persisted anchor. If the anchor has changed (e.g., due to persistence),
// the parent's overlay is relative to an old state and cannot be used.
let overlay = if let Some(parent) = ancestors.last() {
let parent_data = parent.wait_cloned();
// Extend overlay with current block's sorted data
{
let state_mut = Arc::make_mut(&mut overlay.state);
state_mut.extend_ref(sorted_hashed_state.as_ref());
}
{
let nodes_mut = Arc::make_mut(&mut overlay.nodes);
nodes_mut.extend_ref(sorted_trie_updates.as_ref());
}
match &parent_data.anchored_trie_input {
// Case 1: Parent has cached overlay AND anchors match.
Some(AnchoredTrieInput { anchor_hash: parent_anchor, trie_input })
if *parent_anchor == anchor_hash =>
{
// O(1): Reuse parent's overlay, extend with current block's data.
let mut overlay = TrieInputSorted::new(
Arc::clone(&trie_input.nodes),
Arc::clone(&trie_input.state),
Default::default(), // prefix_sets are per-block, not cumulative
);
// Only trigger COW clone if there's actually data to add.
if !sorted_hashed_state.is_empty() {
Arc::make_mut(&mut overlay.state).extend_ref(&sorted_hashed_state);
}
if !sorted_trie_updates.is_empty() {
Arc::make_mut(&mut overlay.nodes).extend_ref(&sorted_trie_updates);
}
overlay
}
// Case 2: Parent exists but anchor mismatch or no cached overlay.
// We must rebuild from the ancestors list (which only contains unpersisted blocks).
_ => Self::merge_ancestors_into_overlay(
ancestors,
&sorted_hashed_state,
&sorted_trie_updates,
),
}
} else {
// Case 3: No in-memory ancestors (first block after persisted anchor).
// Build overlay with just this block's data.
Self::merge_ancestors_into_overlay(&[], &sorted_hashed_state, &sorted_trie_updates)
};
ComputedTrieData::with_trie_input(
sorted_hashed_state,
sorted_trie_updates,
Arc::new(sorted_hashed_state),
Arc::new(sorted_trie_updates),
anchor_hash,
Arc::new(overlay),
)
}
/// Merge all ancestors and current block's data into a single overlay.
///
/// This is a rare fallback path, only used when no ancestor has a cached
/// `anchored_trie_input` (e.g., blocks created via alternative constructors).
/// In normal operation, the parent always has a cached overlay and this
/// function is never called.
///
/// Iterates ancestors oldest -> newest, then extends with current block's data,
/// so later state takes precedence.
fn merge_ancestors_into_overlay(
ancestors: &[Self],
sorted_hashed_state: &HashedPostStateSorted,
sorted_trie_updates: &TrieUpdatesSorted,
) -> TrieInputSorted {
let mut overlay = TrieInputSorted::default();
let state_mut = Arc::make_mut(&mut overlay.state);
let nodes_mut = Arc::make_mut(&mut overlay.nodes);
for ancestor in ancestors {
let ancestor_data = ancestor.wait_cloned();
state_mut.extend_ref(ancestor_data.hashed_state.as_ref());
nodes_mut.extend_ref(ancestor_data.trie_updates.as_ref());
}
// Extend with current block's sorted data last (takes precedence)
state_mut.extend_ref(sorted_hashed_state);
nodes_mut.extend_ref(sorted_trie_updates);
overlay
}
/// Returns trie data, computing synchronously if the async task hasn't completed.
///
/// - If the async task has completed (`Ready`), returns the cached result.
@@ -204,7 +267,7 @@ impl DeferredTrieData {
#[instrument(level = "debug", target = "engine::tree::deferred_trie", skip_all)]
pub fn wait_cloned(&self) -> ComputedTrieData {
let mut state = self.state.lock();
match &*state {
match &mut *state {
// If the deferred trie data is ready, return the cached result.
DeferredState::Ready(bundle) => {
DEFERRED_TRIE_METRICS.deferred_trie_async_ready.increment(1);
@@ -212,11 +275,14 @@ impl DeferredTrieData {
}
// If the deferred trie data is pending, compute the trie data synchronously and return
// the result. This is the fallback path if the async task hasn't completed.
DeferredState::Pending(inputs) => {
DeferredState::Pending(maybe_inputs) => {
DEFERRED_TRIE_METRICS.deferred_trie_sync_fallback.increment(1);
let inputs = maybe_inputs.take().expect("inputs must be present in Pending state");
let computed = Self::sort_and_build_trie_input(
&inputs.hashed_state,
&inputs.trie_updates,
inputs.hashed_state,
inputs.trie_updates,
inputs.anchor_hash,
&inputs.ancestors,
);
@@ -441,4 +507,365 @@ mod tests {
let (_, account) = &overlay_state[0];
assert_eq!(account.unwrap().nonce, 2);
}
/// Helper to create a ready block with anchored trie input containing specific state.
fn ready_block_with_state(
anchor_hash: B256,
accounts: Vec<(B256, Option<Account>)>,
) -> DeferredTrieData {
let hashed_state = Arc::new(HashedPostStateSorted::new(accounts, B256Map::default()));
let trie_updates = Arc::default();
let mut overlay = TrieInputSorted::default();
Arc::make_mut(&mut overlay.state).extend_ref(hashed_state.as_ref());
DeferredTrieData::ready(ComputedTrieData {
hashed_state,
trie_updates,
anchored_trie_input: Some(AnchoredTrieInput {
anchor_hash,
trie_input: Arc::new(overlay),
}),
})
}
/// Verifies that first block after anchor (no ancestors) creates empty base overlay.
#[test]
fn first_block_after_anchor_creates_empty_base() {
let anchor = B256::with_last_byte(1);
let key = B256::with_last_byte(42);
let account = Account { nonce: 1, balance: U256::ZERO, bytecode_hash: None };
// First block after anchor - no ancestors
let first_block = DeferredTrieData::pending(
Arc::new(HashedPostState::default().with_accounts([(key, Some(account))])),
Arc::new(TrieUpdates::default()),
anchor,
vec![], // No ancestors
);
let result = first_block.wait_cloned();
// Should have overlay with just this block's data
let overlay = result.anchored_trie_input.as_ref().unwrap();
assert_eq!(overlay.anchor_hash, anchor);
assert_eq!(overlay.trie_input.state.accounts.len(), 1);
let (found_key, found_account) = &overlay.trie_input.state.accounts[0];
assert_eq!(*found_key, key);
assert_eq!(found_account.unwrap().nonce, 1);
}
/// Verifies that parent's overlay is reused regardless of anchor.
#[test]
fn reuses_parent_overlay() {
let anchor = B256::with_last_byte(1);
let key = B256::with_last_byte(42);
let account = Account { nonce: 100, balance: U256::ZERO, bytecode_hash: None };
// Create parent with anchored trie input
let parent = ready_block_with_state(anchor, vec![(key, Some(account))]);
// Create child - should reuse parent's overlay
let child = DeferredTrieData::pending(
Arc::new(HashedPostState::default()),
Arc::new(TrieUpdates::default()),
anchor,
vec![parent],
);
let result = child.wait_cloned();
// Verify parent's account is in the overlay
let overlay = result.anchored_trie_input.as_ref().unwrap();
assert_eq!(overlay.anchor_hash, anchor);
assert_eq!(overlay.trie_input.state.accounts.len(), 1);
let (found_key, found_account) = &overlay.trie_input.state.accounts[0];
assert_eq!(*found_key, key);
assert_eq!(found_account.unwrap().nonce, 100);
}
/// Verifies that parent's overlay is NOT reused when anchor changes (after persist).
/// The overlay data is dependent on the anchor, so it must be rebuilt from the
/// remaining ancestors.
#[test]
fn rebuilds_overlay_when_anchor_changes() {
let old_anchor = B256::with_last_byte(1);
let new_anchor = B256::with_last_byte(2);
let key = B256::with_last_byte(42);
let account = Account { nonce: 50, balance: U256::ZERO, bytecode_hash: None };
// Create parent with OLD anchor
let parent = ready_block_with_state(old_anchor, vec![(key, Some(account))]);
// Create child with NEW anchor (simulates after persist)
// Should NOT reuse parent's overlay because anchor changed
let child = DeferredTrieData::pending(
Arc::new(HashedPostState::default()),
Arc::new(TrieUpdates::default()),
new_anchor,
vec![parent],
);
let result = child.wait_cloned();
// Verify result uses new anchor
let overlay = result.anchored_trie_input.as_ref().unwrap();
assert_eq!(overlay.anchor_hash, new_anchor);
// Crucially, since we provided `parent` in ancestors but it has a different anchor,
// the code falls back to `merge_ancestors_into_overlay`.
// `merge_ancestors_into_overlay` reads `parent.hashed_state` (which has the account).
// So the account IS present, but it was obtained via REBUILD, not REUSE.
// We can check `DEFERRED_TRIE_METRICS` if we want to be sure, but functionally:
assert_eq!(overlay.trie_input.state.accounts.len(), 1);
let (found_key, found_account) = &overlay.trie_input.state.accounts[0];
assert_eq!(*found_key, key);
assert_eq!(found_account.unwrap().nonce, 50);
}
/// Verifies that parent without `anchored_trie_input` triggers rebuild path.
#[test]
fn rebuilds_when_parent_has_no_anchored_input() {
let anchor = B256::with_last_byte(1);
let key = B256::with_last_byte(42);
let account = Account { nonce: 25, balance: U256::ZERO, bytecode_hash: None };
// Create parent WITHOUT anchored trie input (e.g., from without_trie_input constructor)
let parent_state =
HashedPostStateSorted::new(vec![(key, Some(account))], B256Map::default());
let parent = DeferredTrieData::ready(ComputedTrieData {
hashed_state: Arc::new(parent_state),
trie_updates: Arc::default(),
anchored_trie_input: None, // No anchored input
});
// Create child - should rebuild from parent's hashed_state
let child = DeferredTrieData::pending(
Arc::new(HashedPostState::default()),
Arc::new(TrieUpdates::default()),
anchor,
vec![parent],
);
let result = child.wait_cloned();
// Verify overlay is built and contains parent's data
let overlay = result.anchored_trie_input.as_ref().unwrap();
assert_eq!(overlay.anchor_hash, anchor);
assert_eq!(overlay.trie_input.state.accounts.len(), 1);
}
/// Verifies that a chain of blocks with matching anchors builds correct cumulative overlay.
#[test]
fn chain_of_blocks_builds_cumulative_overlay() {
let anchor = B256::with_last_byte(1);
let key1 = B256::with_last_byte(1);
let key2 = B256::with_last_byte(2);
let key3 = B256::with_last_byte(3);
// Block 1: sets account at key1
let block1 = ready_block_with_state(
anchor,
vec![(key1, Some(Account { nonce: 1, balance: U256::ZERO, bytecode_hash: None }))],
);
// Block 2: adds account at key2, ancestor is block1
let block2_hashed = HashedPostState::default().with_accounts([(
key2,
Some(Account { nonce: 2, balance: U256::ZERO, bytecode_hash: None }),
)]);
let block2 = DeferredTrieData::pending(
Arc::new(block2_hashed),
Arc::new(TrieUpdates::default()),
anchor,
vec![block1.clone()],
);
// Compute block2's trie data
let block2_computed = block2.wait_cloned();
let block2_ready = DeferredTrieData::ready(block2_computed);
// Block 3: adds account at key3, ancestor is block2 (which includes block1)
let block3_hashed = HashedPostState::default().with_accounts([(
key3,
Some(Account { nonce: 3, balance: U256::ZERO, bytecode_hash: None }),
)]);
let block3 = DeferredTrieData::pending(
Arc::new(block3_hashed),
Arc::new(TrieUpdates::default()),
anchor,
vec![block1, block2_ready],
);
let result = block3.wait_cloned();
// Verify all three accounts are in the cumulative overlay
let overlay = result.anchored_trie_input.as_ref().unwrap();
assert_eq!(overlay.trie_input.state.accounts.len(), 3);
// Accounts should be sorted by key (B256 ordering)
let accounts = &overlay.trie_input.state.accounts;
assert!(accounts.iter().any(|(k, a)| *k == key1 && a.unwrap().nonce == 1));
assert!(accounts.iter().any(|(k, a)| *k == key2 && a.unwrap().nonce == 2));
assert!(accounts.iter().any(|(k, a)| *k == key3 && a.unwrap().nonce == 3));
}
/// Verifies that child block's state overwrites parent's state for the same key.
#[test]
fn child_state_overwrites_parent() {
let anchor = B256::with_last_byte(1);
let key = B256::with_last_byte(42);
// Parent sets nonce to 10
let parent = ready_block_with_state(
anchor,
vec![(key, Some(Account { nonce: 10, balance: U256::ZERO, bytecode_hash: None }))],
);
// Child overwrites nonce to 99
let child_hashed = HashedPostState::default().with_accounts([(
key,
Some(Account { nonce: 99, balance: U256::ZERO, bytecode_hash: None }),
)]);
let child = DeferredTrieData::pending(
Arc::new(child_hashed),
Arc::new(TrieUpdates::default()),
anchor,
vec![parent],
);
let result = child.wait_cloned();
// Verify child's value wins (extend_ref uses later value)
let overlay = result.anchored_trie_input.as_ref().unwrap();
// Note: extend_ref may result in duplicate keys; check the last occurrence
let accounts = &overlay.trie_input.state.accounts;
let last_account = accounts.iter().rfind(|(k, _)| *k == key).unwrap();
assert_eq!(last_account.1.unwrap().nonce, 99);
}
/// Stress test: verify O(N) behavior by building a chain of many blocks.
/// This test ensures the fix doesn't regress - previously this would be O(N²).
#[test]
fn long_chain_builds_in_linear_time() {
let anchor = B256::with_last_byte(1);
let num_blocks = 50; // Enough to notice O(N²) vs O(N) difference
let mut ancestors: Vec<DeferredTrieData> = Vec::new();
let start = Instant::now();
for i in 0..num_blocks {
let key = B256::with_last_byte(i as u8);
let account = Account { nonce: i as u64, balance: U256::ZERO, bytecode_hash: None };
let hashed = HashedPostState::default().with_accounts([(key, Some(account))]);
let block = DeferredTrieData::pending(
Arc::new(hashed),
Arc::new(TrieUpdates::default()),
anchor,
ancestors.clone(),
);
// Compute and add to ancestors for next iteration
let computed = block.wait_cloned();
ancestors.push(DeferredTrieData::ready(computed));
}
let elapsed = start.elapsed();
// With O(N) fix, 50 blocks should complete quickly (< 1 second)
// With O(N²), this would take significantly longer
assert!(
elapsed < Duration::from_secs(2),
"Chain of {num_blocks} blocks took {:?}, possible O(N²) regression",
elapsed
);
// Verify final overlay has all accounts
let final_result = ancestors.last().unwrap().wait_cloned();
let overlay = final_result.anchored_trie_input.as_ref().unwrap();
assert_eq!(overlay.trie_input.state.accounts.len(), num_blocks);
}
/// Verifies that a multi-ancestor overlay is rebuilt when anchor changes.
/// This simulates the "persist prefix then keep building" scenario where:
/// 1. A chain of blocks is built with anchor A
/// 2. Some blocks are persisted, changing anchor to B
/// 3. New blocks must rebuild the overlay from the remaining ancestors
#[test]
fn multi_ancestor_overlay_rebuilt_after_anchor_change() {
let old_anchor = B256::with_last_byte(1);
let new_anchor = B256::with_last_byte(2);
let key1 = B256::with_last_byte(1);
let key2 = B256::with_last_byte(2);
let key3 = B256::with_last_byte(3);
let key4 = B256::with_last_byte(4);
// Build a chain of 3 blocks with old_anchor
let block1 = ready_block_with_state(
old_anchor,
vec![(key1, Some(Account { nonce: 1, balance: U256::ZERO, bytecode_hash: None }))],
);
let block2_hashed = HashedPostState::default().with_accounts([(
key2,
Some(Account { nonce: 2, balance: U256::ZERO, bytecode_hash: None }),
)]);
let block2 = DeferredTrieData::pending(
Arc::new(block2_hashed),
Arc::new(TrieUpdates::default()),
old_anchor,
vec![block1.clone()],
);
let block2_ready = DeferredTrieData::ready(block2.wait_cloned());
let block3_hashed = HashedPostState::default().with_accounts([(
key3,
Some(Account { nonce: 3, balance: U256::ZERO, bytecode_hash: None }),
)]);
let block3 = DeferredTrieData::pending(
Arc::new(block3_hashed),
Arc::new(TrieUpdates::default()),
old_anchor,
vec![block1.clone(), block2_ready.clone()],
);
let block3_ready = DeferredTrieData::ready(block3.wait_cloned());
// Verify block3's overlay has all 3 accounts with old_anchor
let block3_overlay = block3_ready.wait_cloned().anchored_trie_input.unwrap();
assert_eq!(block3_overlay.anchor_hash, old_anchor);
assert_eq!(block3_overlay.trie_input.state.accounts.len(), 3);
// Now simulate persist: create block4 with NEW anchor but same ancestors.
// To verify correct rebuilding, we must provide ALL unpersisted ancestors.
// If we only provided block3, the rebuild would only see block3's state.
// We pass block1, block2, block3 to simulate that they are all still in memory
// but the anchor check forces a rebuild (e.g. artificial anchor change).
let block4_hashed = HashedPostState::default().with_accounts([(
key4,
Some(Account { nonce: 4, balance: U256::ZERO, bytecode_hash: None }),
)]);
let block4 = DeferredTrieData::pending(
Arc::new(block4_hashed),
Arc::new(TrieUpdates::default()),
new_anchor, // Different anchor - simulates post-persist
vec![block1, block2_ready, block3_ready],
);
let result = block4.wait_cloned();
// Verify:
// 1. New anchor is used in result
assert_eq!(result.anchor_hash(), Some(new_anchor));
// 2. All 4 accounts are in the overlay (rebuilt from ancestors + extended)
let overlay = result.anchored_trie_input.as_ref().unwrap();
assert_eq!(overlay.trie_input.state.accounts.len(), 4);
// 3. All accounts have correct values
let accounts = &overlay.trie_input.state.accounts;
assert!(accounts.iter().any(|(k, a)| *k == key1 && a.unwrap().nonce == 1));
assert!(accounts.iter().any(|(k, a)| *k == key2 && a.unwrap().nonce == 2));
assert!(accounts.iter().any(|(k, a)| *k == key3 && a.unwrap().nonce == 3));
assert!(accounts.iter().any(|(k, a)| *k == key4 && a.unwrap().nonce == 4));
}
}

View File

@@ -317,6 +317,7 @@ impl<N: NodePrimitives> CanonicalInMemoryState<N> {
/// This will update the links between blocks and remove all blocks that are [..
/// `persisted_height`].
pub fn remove_persisted_blocks(&self, persisted_num_hash: BlockNumHash) {
self.set_persisted(persisted_num_hash);
// if the persisted hash is not in the canonical in memory state, do nothing, because it
// means canonical blocks were not actually persisted.
//
@@ -444,6 +445,11 @@ impl<N: NodePrimitives> CanonicalInMemoryState<N> {
self.inner.chain_info_tracker.set_finalized(header);
}
/// Persisted block setter.
pub fn set_persisted(&self, num_hash: BlockNumHash) {
self.inner.chain_info_tracker.set_persisted(num_hash);
}
/// Canonical head getter.
pub fn get_canonical_head(&self) -> SealedHeader<N::BlockHeader> {
self.inner.chain_info_tracker.get_canonical_head()
@@ -459,6 +465,11 @@ impl<N: NodePrimitives> CanonicalInMemoryState<N> {
self.inner.chain_info_tracker.get_safe_header()
}
/// Persisted block `BlockNumHash` getter.
pub fn get_persisted_num_hash(&self) -> Option<BlockNumHash> {
self.inner.chain_info_tracker.get_persisted_num_hash()
}
/// Returns the `SealedHeader` corresponding to the pending state.
pub fn pending_sealed_header(&self) -> Option<SealedHeader<N::BlockHeader>> {
self.pending_state().map(|h| h.block_ref().recovered_block().clone_sealed_header())
@@ -511,6 +522,11 @@ impl<N: NodePrimitives> CanonicalInMemoryState<N> {
self.inner.chain_info_tracker.subscribe_finalized_block()
}
/// Subscribe to new persisted block events.
pub fn subscribe_persisted_block(&self) -> watch::Receiver<Option<BlockNumHash>> {
self.inner.chain_info_tracker.subscribe_persisted_block()
}
/// Attempts to send a new [`CanonStateNotification`] to all active Receiver handles.
pub fn notify_canon_state(&self, event: CanonStateNotification<N>) {
self.inner.canon_state_notification_sender.send(event).ok();
@@ -930,6 +946,8 @@ impl<N: NodePrimitives<SignedTx: SignedTransaction>> NewCanonicalChain<N> {
chain.append_block(
exec.recovered_block().clone(),
exec.execution_outcome().clone(),
exec.trie_updates(),
exec.hashed_state(),
);
chain
}));
@@ -940,6 +958,8 @@ impl<N: NodePrimitives<SignedTx: SignedTransaction>> NewCanonicalChain<N> {
chain.append_block(
exec.recovered_block().clone(),
exec.execution_outcome().clone(),
exec.trie_updates(),
exec.hashed_state(),
);
chain
}));
@@ -947,6 +967,8 @@ impl<N: NodePrimitives<SignedTx: SignedTransaction>> NewCanonicalChain<N> {
chain.append_block(
exec.recovered_block().clone(),
exec.execution_outcome().clone(),
exec.trie_updates(),
exec.hashed_state(),
);
chain
}));
@@ -1530,13 +1552,24 @@ mod tests {
// Test commit notification
let chain_commit = NewCanonicalChain::Commit { new: vec![block0.clone(), block1.clone()] };
// Build expected trie updates map
let mut expected_trie_updates = BTreeMap::new();
expected_trie_updates.insert(0, block0.trie_updates());
expected_trie_updates.insert(1, block1.trie_updates());
// Build expected hashed state map
let mut expected_hashed_state = BTreeMap::new();
expected_hashed_state.insert(0, block0.hashed_state());
expected_hashed_state.insert(1, block1.hashed_state());
assert_eq!(
chain_commit.to_chain_notification(),
CanonStateNotification::Commit {
new: Arc::new(Chain::new(
vec![block0.recovered_block().clone(), block1.recovered_block().clone()],
sample_execution_outcome.clone(),
None
expected_trie_updates,
expected_hashed_state
))
}
);
@@ -1547,18 +1580,40 @@ mod tests {
old: vec![block1.clone(), block2.clone()],
};
// Build expected trie updates for old chain
let mut old_trie_updates = BTreeMap::new();
old_trie_updates.insert(1, block1.trie_updates());
old_trie_updates.insert(2, block2.trie_updates());
// Build expected trie updates for new chain
let mut new_trie_updates = BTreeMap::new();
new_trie_updates.insert(1, block1a.trie_updates());
new_trie_updates.insert(2, block2a.trie_updates());
// Build expected hashed state for old chain
let mut old_hashed_state = BTreeMap::new();
old_hashed_state.insert(1, block1.hashed_state());
old_hashed_state.insert(2, block2.hashed_state());
// Build expected hashed state for new chain
let mut new_hashed_state = BTreeMap::new();
new_hashed_state.insert(1, block1a.hashed_state());
new_hashed_state.insert(2, block2a.hashed_state());
assert_eq!(
chain_reorg.to_chain_notification(),
CanonStateNotification::Reorg {
old: Arc::new(Chain::new(
vec![block1.recovered_block().clone(), block2.recovered_block().clone()],
sample_execution_outcome.clone(),
None
old_trie_updates,
old_hashed_state
)),
new: Arc::new(Chain::new(
vec![block1a.recovered_block().clone(), block2a.recovered_block().clone()],
sample_execution_outcome,
None
new_trie_updates,
new_hashed_state
))
}
);

View File

@@ -23,7 +23,8 @@ mod notifications;
pub use notifications::{
CanonStateNotification, CanonStateNotificationSender, CanonStateNotificationStream,
CanonStateNotifications, CanonStateSubscriptions, ForkChoiceNotifications, ForkChoiceStream,
ForkChoiceSubscriptions,
ForkChoiceSubscriptions, PersistedBlockNotifications, PersistedBlockSubscriptions,
WatchValueStream,
};
mod memory_overlay;

View File

@@ -2,7 +2,7 @@
use crate::{
CanonStateNotifications, CanonStateSubscriptions, ForkChoiceNotifications,
ForkChoiceSubscriptions,
ForkChoiceSubscriptions, PersistedBlockNotifications, PersistedBlockSubscriptions,
};
use reth_primitives_traits::NodePrimitives;
use reth_storage_api::noop::NoopProvider;
@@ -27,3 +27,10 @@ impl<C: Send + Sync, N: NodePrimitives> ForkChoiceSubscriptions for NoopProvider
ForkChoiceNotifications(rx)
}
}
impl<C: Send + Sync, N: NodePrimitives> PersistedBlockSubscriptions for NoopProvider<C, N> {
fn subscribe_persisted_block(&self) -> PersistedBlockNotifications {
let (_, rx) = watch::channel(None);
PersistedBlockNotifications(rx)
}
}

View File

@@ -1,6 +1,6 @@
//! Canonical chain state notification trait and types.
use alloy_eips::eip2718::Encodable2718;
use alloy_eips::{eip2718::Encodable2718, BlockNumHash};
use derive_more::{Deref, DerefMut};
use reth_execution_types::{BlockReceipts, Chain};
use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedHeader};
@@ -205,22 +205,22 @@ pub trait ForkChoiceSubscriptions: Send + Sync {
}
}
/// A stream for fork choice watch channels (pending, safe or finalized watchers)
/// A stream that yields values from a `watch::Receiver<Option<T>>`, filtering out `None` values.
#[derive(Debug)]
#[pin_project::pin_project]
pub struct ForkChoiceStream<T> {
pub struct WatchValueStream<T> {
#[pin]
st: WatchStream<Option<T>>,
}
impl<T: Clone + Sync + Send + 'static> ForkChoiceStream<T> {
/// Creates a new `ForkChoiceStream`
impl<T: Clone + Sync + Send + 'static> WatchValueStream<T> {
/// Creates a new [`WatchValueStream`]
pub fn new(rx: watch::Receiver<Option<T>>) -> Self {
Self { st: WatchStream::from_changes(rx) }
}
}
impl<T: Clone + Sync + Send + 'static> Stream for ForkChoiceStream<T> {
impl<T: Clone + Sync + Send + 'static> Stream for WatchValueStream<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
@@ -234,6 +234,24 @@ impl<T: Clone + Sync + Send + 'static> Stream for ForkChoiceStream<T> {
}
}
/// Alias for [`WatchValueStream`] for fork choice watch channels.
pub type ForkChoiceStream<T> = WatchValueStream<T>;
/// Wrapper around a watch receiver that receives persisted block notifications.
#[derive(Debug, Deref, DerefMut)]
pub struct PersistedBlockNotifications(pub watch::Receiver<Option<BlockNumHash>>);
/// A trait that allows subscribing to persisted block events.
pub trait PersistedBlockSubscriptions: Send + Sync {
/// Get notified when a new block is persisted to disk.
fn subscribe_persisted_block(&self) -> PersistedBlockNotifications;
/// Convenience method to get a stream of the persisted blocks.
fn persisted_block_stream(&self) -> WatchValueStream<BlockNumHash> {
WatchValueStream::new(self.subscribe_persisted_block().0)
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -242,6 +260,7 @@ mod tests {
use reth_ethereum_primitives::{Receipt, TransactionSigned, TxType};
use reth_execution_types::ExecutionOutcome;
use reth_primitives_traits::SealedBlock;
use std::collections::BTreeMap;
#[test]
fn test_commit_notification() {
@@ -260,7 +279,8 @@ mod tests {
let chain: Arc<Chain> = Arc::new(Chain::new(
vec![block1.clone(), block2.clone()],
ExecutionOutcome::default(),
None,
BTreeMap::new(),
BTreeMap::new(),
));
// Create a commit notification
@@ -295,12 +315,17 @@ mod tests {
block3.set_block_number(3);
block3.set_hash(block3_hash);
let old_chain: Arc<Chain> =
Arc::new(Chain::new(vec![block1.clone()], ExecutionOutcome::default(), None));
let old_chain: Arc<Chain> = Arc::new(Chain::new(
vec![block1.clone()],
ExecutionOutcome::default(),
BTreeMap::new(),
BTreeMap::new(),
));
let new_chain = Arc::new(Chain::new(
vec![block2.clone(), block3.clone()],
ExecutionOutcome::default(),
None,
BTreeMap::new(),
BTreeMap::new(),
));
// Create a reorg notification
@@ -362,8 +387,12 @@ mod tests {
let execution_outcome = ExecutionOutcome { receipts, ..Default::default() };
// Create a new chain segment with `block1` and `block2` and the execution outcome.
let new_chain: Arc<Chain> =
Arc::new(Chain::new(vec![block1.clone(), block2.clone()], execution_outcome, None));
let new_chain: Arc<Chain> = Arc::new(Chain::new(
vec![block1.clone(), block2.clone()],
execution_outcome,
BTreeMap::new(),
BTreeMap::new(),
));
// Create a commit notification containing the new chain segment.
let notification = CanonStateNotification::Commit { new: new_chain };
@@ -420,8 +449,12 @@ mod tests {
ExecutionOutcome { receipts: old_receipts, ..Default::default() };
// Create an old chain segment to be reverted, containing `old_block1`.
let old_chain: Arc<Chain> =
Arc::new(Chain::new(vec![old_block1.clone()], old_execution_outcome, None));
let old_chain: Arc<Chain> = Arc::new(Chain::new(
vec![old_block1.clone()],
old_execution_outcome,
BTreeMap::new(),
BTreeMap::new(),
));
// Define block2 for the new chain segment, which will be committed.
let mut body = BlockBody::<TransactionSigned>::default();
@@ -449,7 +482,12 @@ mod tests {
ExecutionOutcome { receipts: new_receipts, ..Default::default() };
// Create a new chain segment to be committed, containing `new_block1`.
let new_chain = Arc::new(Chain::new(vec![new_block1.clone()], new_execution_outcome, None));
let new_chain = Arc::new(Chain::new(
vec![new_block1.clone()],
new_execution_outcome,
BTreeMap::new(),
BTreeMap::new(),
));
// Create a reorg notification with both reverted (old) and committed (new) chain segments.
let notification = CanonStateNotification::Reorg { old: old_chain, new: new_chain };

View File

@@ -129,3 +129,5 @@ arbitrary = [
"reth-primitives-traits/arbitrary",
"reth-ethereum-primitives/arbitrary",
]
edge = ["reth-db-common/edge", "reth-stages/rocksdb"]

View File

@@ -107,13 +107,13 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
let (db, sfp) = match access {
AccessRights::RW => (
Arc::new(init_db(db_path, self.db.database_args())?),
StaticFileProviderBuilder::read_write(sf_path)?
StaticFileProviderBuilder::read_write(sf_path)
.with_genesis_block_number(genesis_block_number)
.build()?,
),
AccessRights::RO | AccessRights::RoInconsistent => {
(Arc::new(open_db_read_only(&db_path, self.db.database_args())?), {
let provider = StaticFileProviderBuilder::read_only(sf_path)?
let provider = StaticFileProviderBuilder::read_only(sf_path)
.with_genesis_block_number(genesis_block_number)
.build()?;
provider.watch_directory();

View File

@@ -2,8 +2,8 @@ use alloy_primitives::{hex, BlockHash};
use clap::Parser;
use reth_db::{
static_file::{
ColumnSelectorOne, ColumnSelectorTwo, HeaderWithHashMask, ReceiptMask, TransactionMask,
TransactionSenderMask,
AccountChangesetMask, ColumnSelectorOne, ColumnSelectorTwo, HeaderWithHashMask,
ReceiptMask, TransactionMask, TransactionSenderMask,
},
RawDupSort,
};
@@ -19,7 +19,7 @@ use reth_db_common::DbTool;
use reth_node_api::{HeaderTy, ReceiptTy, TxTy};
use reth_node_builder::NodeTypesWithDB;
use reth_primitives_traits::ValueWithSubKey;
use reth_provider::{providers::ProviderNodeTypes, StaticFileProviderFactory};
use reth_provider::{providers::ProviderNodeTypes, ChangeSetReader, StaticFileProviderFactory};
use reth_static_file_types::StaticFileSegment;
use tracing::error;
@@ -64,6 +64,10 @@ enum Subcommand {
#[arg(value_parser = maybe_json_value_parser)]
key: String,
/// The subkey to get content for, for example address in changeset
#[arg(value_parser = maybe_json_value_parser)]
subkey: Option<String>,
/// Output bytes instead of human-readable decoded value
#[arg(long)]
raw: bool,
@@ -77,33 +81,77 @@ impl Command {
Subcommand::Mdbx { table, key, subkey, end_key, end_subkey, raw } => {
table.view(&GetValueViewer { tool, key, subkey, end_key, end_subkey, raw })?
}
Subcommand::StaticFile { segment, key, raw } => {
let (key, mask): (u64, _) = match segment {
Subcommand::StaticFile { segment, key, subkey, raw } => {
let (key, subkey, mask): (u64, _, _) = match segment {
StaticFileSegment::Headers => (
table_key::<tables::Headers>(&key)?,
None,
<HeaderWithHashMask<HeaderTy<N>>>::MASK,
),
StaticFileSegment::Transactions => {
(table_key::<tables::Transactions>(&key)?, <TransactionMask<TxTy<N>>>::MASK)
}
StaticFileSegment::Receipts => {
(table_key::<tables::Receipts>(&key)?, <ReceiptMask<ReceiptTy<N>>>::MASK)
}
StaticFileSegment::Transactions => (
table_key::<tables::Transactions>(&key)?,
None,
<TransactionMask<TxTy<N>>>::MASK,
),
StaticFileSegment::Receipts => (
table_key::<tables::Receipts>(&key)?,
None,
<ReceiptMask<ReceiptTy<N>>>::MASK,
),
StaticFileSegment::TransactionSenders => (
table_key::<tables::TransactionSenders>(&key)?,
<TransactionSenderMask>::MASK,
None,
TransactionSenderMask::MASK,
),
StaticFileSegment::AccountChangeSets => {
let subkey =
table_subkey::<tables::AccountChangeSets>(subkey.as_deref()).ok();
(
table_key::<tables::AccountChangeSets>(&key)?,
subkey,
AccountChangesetMask::MASK,
)
}
};
let content = tool
.provider_factory
.static_file_provider()
.get_segment_provider(segment, key)?
.cursor()?
.get(key.into(), mask)
.map(|result| {
result.map(|vec| vec.iter().map(|slice| slice.to_vec()).collect::<Vec<_>>())
})?;
// handle account changesets differently if a subkey is provided.
if let StaticFileSegment::AccountChangeSets = segment {
let Some(subkey) = subkey else {
// get all changesets for the block
let changesets = tool
.provider_factory
.static_file_provider()
.account_block_changeset(key)?;
println!("{}", serde_json::to_string_pretty(&changesets)?);
return Ok(())
};
let account = tool
.provider_factory
.static_file_provider()
.get_account_before_block(key, subkey)?;
if let Some(account) = account {
println!("{}", serde_json::to_string_pretty(&account)?);
} else {
error!(target: "reth::cli", "No content for the given table key.");
}
return Ok(())
}
let content = tool.provider_factory.static_file_provider().find_static_file(
segment,
|provider| {
let mut cursor = provider.cursor()?;
cursor.get(key.into(), mask).map(|result| {
result.map(|vec| {
vec.iter().map(|slice| slice.to_vec()).collect::<Vec<_>>()
})
})
},
)?;
match content {
Some(content) => {
@@ -139,6 +187,9 @@ impl Command {
)?;
println!("{}", serde_json::to_string_pretty(&sender)?);
}
StaticFileSegment::AccountChangeSets => {
unreachable!("account changeset static files are special cased before this match")
}
}
}
}

View File

@@ -162,7 +162,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
let access_rights =
if command.dry_run { AccessRights::RO } else { AccessRights::RW };
db_exec!(self.env, tool, N, access_rights, {
command.execute(&tool, ctx.task_executor.clone())?;
command.execute(&tool, ctx.task_executor.clone(), &data_dir)?;
});
}
Subcommands::StaticFileHeader(command) => {

View File

@@ -9,7 +9,10 @@ use reth_db_api::{
transaction::{DbTx, DbTxMut},
};
use reth_db_common::DbTool;
use reth_node_core::version::version_metadata;
use reth_node_core::{
dirs::{ChainPath, DataDirPath},
version::version_metadata,
};
use reth_node_metrics::{
chain::ChainSpecInfo,
hooks::Hooks,
@@ -53,11 +56,13 @@ impl Command {
self,
tool: &DbTool<N>,
task_executor: TaskExecutor,
data_dir: &ChainPath<DataDirPath>,
) -> eyre::Result<()> {
// Set up metrics server if requested
let _metrics_handle = if let Some(listen_addr) = self.metrics {
let chain_name = tool.provider_factory.chain_spec().chain().to_string();
let executor = task_executor.clone();
let pprof_dump_dir = data_dir.pprof_dumps();
let handle = task_executor.spawn_critical("metrics server", async move {
let config = MetricServerConfig::new(
@@ -73,6 +78,7 @@ impl Command {
ChainSpecInfo { name: chain_name },
executor,
Hooks::builder().build(),
pprof_dump_dir,
);
// Spawn the metrics server

View File

@@ -40,12 +40,17 @@ enum Subcommands {
#[clap(rename_all = "snake_case")]
pub enum SetCommand {
/// Store receipts in static files instead of the database
ReceiptsInStaticFiles {
Receipts {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store transaction senders in static files instead of the database
TransactionSendersInStaticFiles {
TransactionSenders {
#[clap(action(ArgAction::Set))]
value: bool,
},
/// Store account changesets in static files instead of the database
AccountChangesets {
#[clap(action(ArgAction::Set))]
value: bool,
},
@@ -94,11 +99,12 @@ impl Command {
storages_history_in_rocksdb: _,
transaction_hash_numbers_in_rocksdb: _,
account_history_in_rocksdb: _,
account_changesets_in_static_files: _,
} = settings.unwrap_or_else(StorageSettings::legacy);
// Update the setting based on the key
match cmd {
SetCommand::ReceiptsInStaticFiles { value } => {
SetCommand::Receipts { value } => {
if settings.receipts_in_static_files == value {
println!("receipts_in_static_files is already set to {}", value);
return Ok(());
@@ -106,7 +112,7 @@ impl Command {
settings.receipts_in_static_files = value;
println!("Set receipts_in_static_files = {}", value);
}
SetCommand::TransactionSendersInStaticFiles { value } => {
SetCommand::TransactionSenders { value } => {
if settings.transaction_senders_in_static_files == value {
println!("transaction_senders_in_static_files is already set to {}", value);
return Ok(());
@@ -114,6 +120,14 @@ impl Command {
settings.transaction_senders_in_static_files = value;
println!("Set transaction_senders_in_static_files = {}", value);
}
SetCommand::AccountChangesets { value } => {
if settings.account_changesets_in_static_files == value {
println!("account_changesets_in_static_files is already set to {}", value);
return Ok(());
}
settings.account_changesets_in_static_files = value;
println!("Set account_changesets_in_static_files = {}", value);
}
}
// Write updated settings

View File

@@ -69,9 +69,7 @@ pub async fn import_blocks_from_file<N>(
provider_factory: ProviderFactory<N>,
config: &Config,
executor: impl ConfigureEvm<Primitives = N::Primitives> + 'static,
consensus: Arc<
impl FullConsensus<N::Primitives, Error = reth_consensus::ConsensusError> + 'static,
>,
consensus: Arc<impl FullConsensus<N::Primitives> + 'static>,
) -> eyre::Result<ImportResult>
where
N: ProviderNodeTypes,
@@ -198,7 +196,7 @@ pub fn build_import_pipeline_impl<N, C, E>(
) -> eyre::Result<(Pipeline<N>, impl futures::Stream<Item = NodeEvent<N::Primitives>> + use<N, C, E>)>
where
N: ProviderNodeTypes,
C: FullConsensus<N::Primitives, Error = reth_consensus::ConsensusError> + 'static,
C: FullConsensus<N::Primitives> + 'static,
E: ConfigureEvm<Primitives = N::Primitives> + 'static,
{
if !file_client.has_canonical_blocks() {

View File

@@ -99,6 +99,7 @@ where
/// * Headers: It will push an empty block.
/// * Transactions: It will not push any tx, only increments the end block range.
/// * Receipts: It will not push any receipt, only increments the end block range.
/// * TransactionSenders: If the segment exists, increments the end block range.
fn append_dummy_chain<N, F>(
sf_provider: &StaticFileProvider<N>,
target_height: BlockNumber,
@@ -110,8 +111,15 @@ where
{
let (tx, rx) = std::sync::mpsc::channel();
// Spawn jobs for incrementing the block end range of transactions and receipts
for segment in [StaticFileSegment::Transactions, StaticFileSegment::Receipts] {
// Spawn jobs for incrementing the block end range of transactions, receipts, and senders.
for segment in [
StaticFileSegment::Transactions,
StaticFileSegment::Receipts,
StaticFileSegment::TransactionSenders,
] {
if sf_provider.get_highest_static_file_block(segment).is_none() {
continue
}
let tx_clone = tx.clone();
let provider = sf_provider.clone();
std::thread::spawn(move || {
@@ -151,9 +159,15 @@ where
// If, for any reason, rayon crashes this verifies if all segments are at the same
// target_height.
for segment in
[StaticFileSegment::Headers, StaticFileSegment::Receipts, StaticFileSegment::Transactions]
{
for segment in [
StaticFileSegment::Headers,
StaticFileSegment::Receipts,
StaticFileSegment::Transactions,
StaticFileSegment::TransactionSenders,
] {
if sf_provider.get_highest_static_file_block(segment).is_none() {
continue
}
assert_eq!(
sf_provider.latest_writer(segment)?.user_header().block_end(),
Some(target_height),

View File

@@ -87,6 +87,9 @@ impl<C: ChainSpecParser> Command<C> {
.unwrap_or_default();
writer.prune_transaction_senders(to_delete, 0)?;
}
StaticFileSegment::AccountChangeSets => {
writer.prune_account_changesets(highest_block)?;
}
}
}
}

View File

@@ -1,5 +1,5 @@
use super::setup;
use reth_consensus::{noop::NoopConsensus, ConsensusError, FullConsensus};
use reth_consensus::{noop::NoopConsensus, FullConsensus};
use reth_db::DatabaseEnv;
use reth_db_api::{
cursor::DbCursorRO, database::Database, table::TableImporter, tables, transaction::DbTx,
@@ -28,7 +28,7 @@ pub(crate) async fn dump_execution_stage<N, E, C>(
where
N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>,
E: ConfigureEvm<Primitives = N::Primitives> + 'static,
C: FullConsensus<E::Primitives, Error = ConsensusError> + 'static,
C: FullConsensus<E::Primitives> + 'static,
{
let (output_db, tip_block_number) = setup(from, to, &output_datadir.db(), db_tool)?;
@@ -169,7 +169,7 @@ fn dry_run<N, E, C>(
where
N: ProviderNodeTypes,
E: ConfigureEvm<Primitives = N::Primitives> + 'static,
C: FullConsensus<E::Primitives, Error = ConsensusError> + 'static,
C: FullConsensus<E::Primitives> + 'static,
{
info!(target: "reth::cli", "Executing stage. [dry-run]");

View File

@@ -4,7 +4,7 @@ use super::setup;
use alloy_primitives::{Address, BlockNumber};
use eyre::Result;
use reth_config::config::EtlConfig;
use reth_consensus::{ConsensusError, FullConsensus};
use reth_consensus::FullConsensus;
use reth_db::DatabaseEnv;
use reth_db_api::{database::Database, models::BlockNumberAddress, table::TableImporter, tables};
use reth_db_common::DbTool;
@@ -31,7 +31,7 @@ pub(crate) async fn dump_merkle_stage<N>(
output_datadir: ChainPath<DataDirPath>,
should_run: bool,
evm_config: impl ConfigureEvm<Primitives = N::Primitives>,
consensus: impl FullConsensus<N::Primitives, Error = ConsensusError> + 'static,
consensus: impl FullConsensus<N::Primitives> + 'static,
) -> Result<()>
where
N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>,
@@ -79,7 +79,7 @@ fn unwind_and_copy<N: ProviderNodeTypes>(
tip_block_number: u64,
output_db: &DatabaseEnv,
evm_config: impl ConfigureEvm<Primitives = N::Primitives>,
consensus: impl FullConsensus<N::Primitives, Error = ConsensusError> + 'static,
consensus: impl FullConsensus<N::Primitives> + 'static,
) -> eyre::Result<()> {
let (from, to) = range;
let provider = db_tool.provider_factory.database_provider_rw()?;

View File

@@ -153,6 +153,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
}
})
.build(),
data_dir.pprof_dumps(),
);
MetricServer::new(config).serve().await?;

View File

@@ -18,8 +18,8 @@ use tracing::{debug, error, trace};
///
/// Provides utilities for running a cli command to completion.
#[derive(Debug)]
#[non_exhaustive]
pub struct CliRunner {
config: CliRunnerConfig,
tokio_runtime: tokio::runtime::Runtime,
}
@@ -29,12 +29,18 @@ impl CliRunner {
///
/// The default tokio runtime is multi-threaded, with both I/O and time drivers enabled.
pub fn try_default_runtime() -> Result<Self, std::io::Error> {
Ok(Self { tokio_runtime: tokio_runtime()? })
Ok(Self { config: CliRunnerConfig::default(), tokio_runtime: tokio_runtime()? })
}
/// Create a new [`CliRunner`] from a provided tokio [`Runtime`](tokio::runtime::Runtime).
pub const fn from_runtime(tokio_runtime: tokio::runtime::Runtime) -> Self {
Self { tokio_runtime }
Self { config: CliRunnerConfig::new(), tokio_runtime }
}
/// Sets the [`CliRunnerConfig`] for this runner.
pub const fn with_config(mut self, config: CliRunnerConfig) -> Self {
self.config = config;
self
}
/// Executes an async block on the runtime and blocks until completion.
@@ -74,7 +80,7 @@ impl CliRunner {
// after the command has finished or exit signal was received we shutdown the task
// manager which fires the shutdown signal to all tasks spawned via the task
// executor and awaiting on tasks spawned with graceful shutdown
task_manager.graceful_shutdown_with_timeout(Duration::from_secs(5));
task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
}
// `drop(tokio_runtime)` would block the current thread until its pools
@@ -128,7 +134,7 @@ impl CliRunner {
error!(target: "reth::cli", "shutting down due to error");
} else {
debug!(target: "reth::cli", "shutting down gracefully");
task_manager.graceful_shutdown_with_timeout(Duration::from_secs(5));
task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
}
// Shutdown the runtime on a separate thread
@@ -211,6 +217,38 @@ pub struct CliContext {
pub task_executor: TaskExecutor,
}
/// Default timeout for graceful shutdown of tasks.
const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
/// Configuration for [`CliRunner`].
#[derive(Debug, Clone)]
pub struct CliRunnerConfig {
/// Timeout for graceful shutdown of tasks.
///
/// After the command completes, this is the maximum time to wait for spawned tasks
/// to finish before forcefully terminating them.
pub graceful_shutdown_timeout: Duration,
}
impl Default for CliRunnerConfig {
fn default() -> Self {
Self::new()
}
}
impl CliRunnerConfig {
/// Creates a new config with default values.
pub const fn new() -> Self {
Self { graceful_shutdown_timeout: DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT }
}
/// Sets the graceful shutdown timeout.
pub const fn with_graceful_shutdown_timeout(mut self, timeout: Duration) -> Self {
self.graceful_shutdown_timeout = timeout;
self
}
}
/// Creates a new default tokio multi-thread [Runtime](tokio::runtime::Runtime) with all features
/// enabled
pub fn tokio_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {

View File

@@ -437,6 +437,8 @@ pub struct BlocksPerFileConfig {
pub receipts: Option<u64>,
/// Number of blocks per file for the transaction senders segment.
pub transaction_senders: Option<u64>,
/// Number of blocks per file for the account changesets segment.
pub account_change_sets: Option<u64>,
}
impl StaticFilesConfig {
@@ -444,8 +446,13 @@ impl StaticFilesConfig {
///
/// Returns an error if any blocks per file value is zero.
pub fn validate(&self) -> eyre::Result<()> {
let BlocksPerFileConfig { headers, transactions, receipts, transaction_senders } =
self.blocks_per_file;
let BlocksPerFileConfig {
headers,
transactions,
receipts,
transaction_senders,
account_change_sets,
} = self.blocks_per_file;
eyre::ensure!(headers != Some(0), "Headers segment blocks per file must be greater than 0");
eyre::ensure!(
transactions != Some(0),
@@ -459,13 +466,22 @@ impl StaticFilesConfig {
transaction_senders != Some(0),
"Transaction senders segment blocks per file must be greater than 0"
);
eyre::ensure!(
account_change_sets != Some(0),
"Account changesets segment blocks per file must be greater than 0"
);
Ok(())
}
/// Converts the blocks per file configuration into a [`HashMap`] per segment.
pub fn as_blocks_per_file_map(&self) -> HashMap<StaticFileSegment, u64> {
let BlocksPerFileConfig { headers, transactions, receipts, transaction_senders } =
self.blocks_per_file;
let BlocksPerFileConfig {
headers,
transactions,
receipts,
transaction_senders,
account_change_sets,
} = self.blocks_per_file;
let mut map = HashMap::new();
// Iterating over all possible segments allows us to do an exhaustive match here,
@@ -476,6 +492,7 @@ impl StaticFilesConfig {
StaticFileSegment::Transactions => transactions,
StaticFileSegment::Receipts => receipts,
StaticFileSegment::TransactionSenders => transaction_senders,
StaticFileSegment::AccountChangeSets => account_change_sets,
};
if let Some(blocks_per_file) = blocks_per_file {
@@ -527,7 +544,7 @@ impl PruneConfig {
/// Returns whether there is any kind of receipt pruning configuration.
pub fn has_receipts_pruning(&self) -> bool {
self.segments.receipts.is_some() || !self.segments.receipts_log_filter.is_empty()
self.segments.has_receipts_pruning()
}
/// Merges values from `other` into `self`.

View File

@@ -500,13 +500,11 @@ mod tests {
let expected_blob_gas_used = 10 * DATA_GAS_PER_BLOB;
// validate blob, it should fail blob gas used validation
assert_eq!(
validate_block_pre_execution(&block, &chain_spec),
Err(ConsensusError::BlobGasUsedDiff(GotExpected {
got: 1,
expected: expected_blob_gas_used
}))
);
assert!(matches!(
validate_block_pre_execution(&block, &chain_spec).unwrap_err(),
ConsensusError::BlobGasUsedDiff(diff)
if diff.got == 1 && diff.expected == expected_blob_gas_used
));
}
#[test]
@@ -517,10 +515,10 @@ mod tests {
// Test exceeding default - should fail
let header_33 = Header { extra_data: Bytes::from(vec![0; 33]), ..Default::default() };
assert_eq!(
validate_header_extra_data(&header_33, 32),
Err(ConsensusError::ExtraDataExceedsMax { len: 33 })
);
assert!(matches!(
validate_header_extra_data(&header_33, 32).unwrap_err(),
ConsensusError::ExtraDataExceedsMax { len } if len == 33
));
// Test with custom larger limit - should pass
assert!(validate_header_extra_data(&header_33, 64).is_ok());

View File

@@ -11,9 +11,10 @@
extern crate alloc;
use alloc::{boxed::Box, fmt::Debug, string::String, vec::Vec};
use alloc::{boxed::Box, fmt::Debug, string::String, sync::Arc, vec::Vec};
use alloy_consensus::Header;
use alloy_primitives::{BlockHash, BlockNumber, Bloom, B256};
use core::error::Error;
use reth_execution_types::BlockExecutionResult;
use reth_primitives_traits::{
constants::{GAS_LIMIT_BOUND_DIVISOR, MAXIMUM_GAS_LIMIT_BLOCK, MINIMUM_GAS_LIMIT},
@@ -49,15 +50,12 @@ pub trait FullConsensus<N: NodePrimitives>: Consensus<N::Block> {
/// Consensus is a protocol that chooses canonical chain.
#[auto_impl::auto_impl(&, Arc)]
pub trait Consensus<B: Block>: HeaderValidator<B::Header> {
/// The error type related to consensus.
type Error;
/// Ensures that body field values match the header.
fn validate_body_against_header(
&self,
body: &B::Body,
header: &SealedHeader<B::Header>,
) -> Result<(), Self::Error>;
) -> Result<(), ConsensusError>;
/// Validate a block disregarding world state, i.e. things that can be checked before sender
/// recovery and execution.
@@ -69,7 +67,7 @@ pub trait Consensus<B: Block>: HeaderValidator<B::Header> {
/// **This should not be called for the genesis block**.
///
/// Note: validating blocks does not include other validations of the Consensus
fn validate_block_pre_execution(&self, block: &SealedBlock<B>) -> Result<(), Self::Error>;
fn validate_block_pre_execution(&self, block: &SealedBlock<B>) -> Result<(), ConsensusError>;
}
/// `HeaderValidator` is a protocol that validates headers and their relationships.
@@ -125,7 +123,7 @@ pub trait HeaderValidator<H = Header>: Debug + Send + Sync {
}
/// Consensus Errors
#[derive(Debug, PartialEq, Eq, Clone, thiserror::Error)]
#[derive(Debug, Clone, thiserror::Error)]
pub enum ConsensusError {
/// Error when the gas used in the header exceeds the gas limit.
#[error("block used gas ({gas_used}) is greater than gas limit ({gas_limit})")]
@@ -410,6 +408,9 @@ pub enum ConsensusError {
/// Other, likely an injected L2 error.
#[error("{0}")]
Other(String),
/// Other unspecified error.
#[error(transparent)]
Custom(#[from] Arc<dyn Error + Send + Sync>),
}
impl ConsensusError {
@@ -447,3 +448,34 @@ pub struct TxGasLimitTooHighErr {
/// The maximum allowed gas limit
pub max_allowed: u64,
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(thiserror::Error, Debug)]
#[error("Custom L2 consensus error")]
struct CustomL2Error;
#[test]
fn test_custom_error_conversion() {
// Test conversion from custom error to ConsensusError
let custom_err = CustomL2Error;
let arc_err: Arc<dyn Error + Send + Sync> = Arc::new(custom_err);
let consensus_err: ConsensusError = arc_err.into();
// Verify it's the Custom variant
assert!(matches!(consensus_err, ConsensusError::Custom(_)));
}
#[test]
fn test_custom_error_display() {
let custom_err = CustomL2Error;
let arc_err: Arc<dyn Error + Send + Sync> = Arc::new(custom_err);
let consensus_err: ConsensusError = arc_err.into();
// Verify the error message is preserved through transparent attribute
let error_message = format!("{}", consensus_err);
assert_eq!(error_message, "Custom L2 consensus error");
}
}

View File

@@ -55,19 +55,17 @@ impl<H> HeaderValidator<H> for NoopConsensus {
}
impl<B: Block> Consensus<B> for NoopConsensus {
type Error = ConsensusError;
/// Validates body against header (no-op implementation).
fn validate_body_against_header(
&self,
_body: &B::Body,
_header: &SealedHeader<B::Header>,
) -> Result<(), Self::Error> {
) -> Result<(), ConsensusError> {
Ok(())
}
/// Validates block before execution (no-op implementation).
fn validate_block_pre_execution(&self, _block: &SealedBlock<B>) -> Result<(), Self::Error> {
fn validate_block_pre_execution(&self, _block: &SealedBlock<B>) -> Result<(), ConsensusError> {
Ok(())
}
}

View File

@@ -61,13 +61,11 @@ impl<N: NodePrimitives> FullConsensus<N> for TestConsensus {
}
impl<B: Block> Consensus<B> for TestConsensus {
type Error = ConsensusError;
fn validate_body_against_header(
&self,
_body: &B::Body,
_header: &SealedHeader<B::Header>,
) -> Result<(), Self::Error> {
) -> Result<(), ConsensusError> {
if self.fail_body_against_header() {
Err(ConsensusError::BaseFeeMissing)
} else {
@@ -75,7 +73,7 @@ impl<B: Block> Consensus<B> for TestConsensus {
}
}
fn validate_block_pre_execution(&self, _block: &SealedBlock<B>) -> Result<(), Self::Error> {
fn validate_block_pre_execution(&self, _block: &SealedBlock<B>) -> Result<(), ConsensusError> {
if self.fail_validation() {
Err(ConsensusError::BaseFeeMissing)
} else {

View File

@@ -113,7 +113,6 @@ pub async fn setup_engine_with_chain_import(
let rocksdb_dir_path = datadir.join("rocksdb");
// Initialize the database using init_db (same as CLI import command)
// Use the same database arguments as the node will use
let db_args = reth_node_core::args::DatabaseArgs::default().database_args();
let db_env = reth_db::init_db(&db_path, db_args)?;
let db = Arc::new(db_env);
@@ -317,7 +316,8 @@ mod tests {
// Import the chain
{
let db_env = reth_db::init_db(&db_path, DatabaseArguments::default()).unwrap();
let db_args = reth_node_core::args::DatabaseArgs::default().database_args();
let db_env = reth_db::init_db(&db_path, db_args).unwrap();
let db = Arc::new(db_env);
let provider_factory: ProviderFactory<
@@ -475,7 +475,8 @@ mod tests {
let datadir = temp_dir.path().join("datadir");
std::fs::create_dir_all(&datadir).unwrap();
let db_path = datadir.join("db");
let db_env = reth_db::init_db(&db_path, DatabaseArguments::default()).unwrap();
let db_args = reth_node_core::args::DatabaseArgs::default().database_args();
let db_env = reth_db::init_db(&db_path, db_args).unwrap();
let db = Arc::new(reth_db::test_utils::TempDatabase::new(db_env, db_path));
// Create static files path

View File

@@ -135,6 +135,8 @@ pub struct TreeConfig {
storage_worker_count: usize,
/// Number of account proof worker threads.
account_worker_count: usize,
/// Whether to enable V2 storage proofs.
enable_proof_v2: bool,
}
impl Default for TreeConfig {
@@ -163,6 +165,7 @@ impl Default for TreeConfig {
allow_unwind_canonical_header: false,
storage_worker_count: default_storage_worker_count(),
account_worker_count: default_account_worker_count(),
enable_proof_v2: false,
}
}
}
@@ -194,6 +197,7 @@ impl TreeConfig {
allow_unwind_canonical_header: bool,
storage_worker_count: usize,
account_worker_count: usize,
enable_proof_v2: bool,
) -> Self {
Self {
persistence_threshold,
@@ -219,6 +223,7 @@ impl TreeConfig {
allow_unwind_canonical_header,
storage_worker_count,
account_worker_count,
enable_proof_v2,
}
}
@@ -500,4 +505,15 @@ impl TreeConfig {
self.account_worker_count = account_worker_count.max(MIN_WORKER_COUNT);
self
}
/// Return whether V2 storage proofs are enabled.
pub const fn enable_proof_v2(&self) -> bool {
self.enable_proof_v2
}
/// Setter for whether to enable V2 storage proofs.
pub const fn with_enable_proof_v2(mut self, enable_proof_v2: bool) -> Self {
self.enable_proof_v2 = enable_proof_v2;
self
}
}

View File

@@ -1,7 +1,7 @@
use futures::{Stream, StreamExt};
use pin_project::pin_project;
use reth_chainspec::EthChainSpec;
use reth_consensus::{ConsensusError, FullConsensus};
use reth_consensus::FullConsensus;
use reth_engine_primitives::{BeaconEngineMessage, ConsensusEngineEvent};
use reth_engine_tree::{
backfill::PipelineSync,
@@ -70,7 +70,7 @@ where
/// Constructor for `EngineService`.
#[expect(clippy::too_many_arguments)]
pub fn new<V, C>(
consensus: Arc<dyn FullConsensus<N::Primitives, Error = ConsensusError>>,
consensus: Arc<dyn FullConsensus<N::Primitives>>,
chain_spec: Arc<N::ChainSpec>,
client: Client,
incoming_requests: EngineMessageStream<N::Payload>,

View File

@@ -29,7 +29,6 @@ reth-provider.workspace = true
reth-prune.workspace = true
reth-revm.workspace = true
reth-stages-api.workspace = true
reth-storage-errors.workspace = true
reth-tasks.workspace = true
reth-trie-parallel.workspace = true
reth-trie-sparse = { workspace = true, features = ["std", "metrics"] }

View File

@@ -4,7 +4,7 @@ use crate::{engine::DownloadRequest, metrics::BlockDownloaderMetrics};
use alloy_consensus::BlockHeader;
use alloy_primitives::B256;
use futures::FutureExt;
use reth_consensus::{Consensus, ConsensusError};
use reth_consensus::Consensus;
use reth_network_p2p::{
full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient},
BlockClient,
@@ -81,7 +81,7 @@ where
B: Block,
{
/// Create a new instance
pub fn new(client: Client, consensus: Arc<dyn Consensus<B, Error = ConsensusError>>) -> Self {
pub fn new(client: Client, consensus: Arc<dyn Consensus<B>>) -> Self {
Self {
full_block_client: FullBlockClient::new(client, consensus),
inflight_full_block_requests: Vec::new(),

View File

@@ -6,6 +6,7 @@ use crate::{
download::{BlockDownloader, DownloadAction, DownloadOutcome},
};
use alloy_primitives::B256;
use crossbeam_channel::Sender;
use futures::{Stream, StreamExt};
use reth_chain_state::ExecutedBlock;
use reth_engine_primitives::{BeaconEngineMessage, ConsensusEngineEvent};
@@ -15,7 +16,6 @@ use reth_primitives_traits::{Block, NodePrimitives, SealedBlock};
use std::{
collections::HashSet,
fmt::Display,
sync::mpsc::Sender,
task::{ready, Context, Poll},
};
use tokio::sync::mpsc::UnboundedReceiver;

View File

@@ -1,5 +1,6 @@
use crate::metrics::PersistenceMetrics;
use alloy_eips::BlockNumHash;
use crossbeam_channel::Sender as CrossbeamSender;
use reth_chain_state::ExecutedBlock;
use reth_errors::ProviderError;
use reth_ethereum_primitives::EthPrimitives;
@@ -15,7 +16,6 @@ use std::{
time::Instant,
};
use thiserror::Error;
use tokio::sync::oneshot;
use tracing::{debug, error};
/// Writes parts of reth's in memory tree state to the database and static files.
@@ -183,13 +183,13 @@ pub enum PersistenceAction<N: NodePrimitives = EthPrimitives> {
///
/// First, header, transaction, and receipt-related data should be written to static files.
/// Then the execution history-related data will be written to the database.
SaveBlocks(Vec<ExecutedBlock<N>>, oneshot::Sender<Option<BlockNumHash>>),
SaveBlocks(Vec<ExecutedBlock<N>>, CrossbeamSender<Option<BlockNumHash>>),
/// Removes block data above the given block number from the database.
///
/// This will first update checkpoints from the database, then remove actual block data from
/// static files.
RemoveBlocksAbove(u64, oneshot::Sender<Option<BlockNumHash>>),
RemoveBlocksAbove(u64, CrossbeamSender<Option<BlockNumHash>>),
/// Update the persisted finalized block on disk
SaveFinalizedBlock(u64),
@@ -261,7 +261,7 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
pub fn save_blocks(
&self,
blocks: Vec<ExecutedBlock<T>>,
tx: oneshot::Sender<Option<BlockNumHash>>,
tx: CrossbeamSender<Option<BlockNumHash>>,
) -> Result<(), SendError<PersistenceAction<T>>> {
self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
}
@@ -290,7 +290,7 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
pub fn remove_blocks_above(
&self,
block_num: u64,
tx: oneshot::Sender<Option<BlockNumHash>>,
tx: CrossbeamSender<Option<BlockNumHash>>,
) -> Result<(), SendError<PersistenceAction<T>>> {
self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx))
}
@@ -319,22 +319,22 @@ mod tests {
PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx)
}
#[tokio::test]
async fn test_save_blocks_empty() {
#[test]
fn test_save_blocks_empty() {
reth_tracing::init_test_tracing();
let persistence_handle = default_persistence_handle();
let blocks = vec![];
let (tx, rx) = oneshot::channel();
let (tx, rx) = crossbeam_channel::bounded(1);
persistence_handle.save_blocks(blocks, tx).unwrap();
let hash = rx.await.unwrap();
let hash = rx.recv().unwrap();
assert_eq!(hash, None);
}
#[tokio::test]
async fn test_save_blocks_single_block() {
#[test]
fn test_save_blocks_single_block() {
reth_tracing::init_test_tracing();
let persistence_handle = default_persistence_handle();
let block_number = 0;
@@ -344,37 +344,35 @@ mod tests {
let block_hash = executed.recovered_block().hash();
let blocks = vec![executed];
let (tx, rx) = oneshot::channel();
let (tx, rx) = crossbeam_channel::bounded(1);
persistence_handle.save_blocks(blocks, tx).unwrap();
let BlockNumHash { hash: actual_hash, number: _ } =
tokio::time::timeout(std::time::Duration::from_secs(10), rx)
.await
.expect("test timed out")
.expect("channel closed unexpectedly")
.expect("no hash returned");
let BlockNumHash { hash: actual_hash, number: _ } = rx
.recv_timeout(std::time::Duration::from_secs(10))
.expect("test timed out")
.expect("no hash returned");
assert_eq!(block_hash, actual_hash);
}
#[tokio::test]
async fn test_save_blocks_multiple_blocks() {
#[test]
fn test_save_blocks_multiple_blocks() {
reth_tracing::init_test_tracing();
let persistence_handle = default_persistence_handle();
let mut test_block_builder = TestBlockBuilder::eth();
let blocks = test_block_builder.get_executed_blocks(0..5).collect::<Vec<_>>();
let last_hash = blocks.last().unwrap().recovered_block().hash();
let (tx, rx) = oneshot::channel();
let (tx, rx) = crossbeam_channel::bounded(1);
persistence_handle.save_blocks(blocks, tx).unwrap();
let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap();
let BlockNumHash { hash: actual_hash, number: _ } = rx.recv().unwrap().unwrap();
assert_eq!(last_hash, actual_hash);
}
#[tokio::test]
async fn test_save_blocks_multiple_calls() {
#[test]
fn test_save_blocks_multiple_calls() {
reth_tracing::init_test_tracing();
let persistence_handle = default_persistence_handle();
@@ -383,11 +381,11 @@ mod tests {
for range in ranges {
let blocks = test_block_builder.get_executed_blocks(range).collect::<Vec<_>>();
let last_hash = blocks.last().unwrap().recovered_block().hash();
let (tx, rx) = oneshot::channel();
let (tx, rx) = crossbeam_channel::bounded(1);
persistence_handle.save_blocks(blocks, tx).unwrap();
let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap();
let BlockNumHash { hash: actual_hash, number: _ } = rx.recv().unwrap().unwrap();
assert_eq!(last_hash, actual_hash);
}
}

View File

@@ -629,6 +629,11 @@ impl SavedCache {
Arc::strong_count(&self.usage_guard) == 1
}
/// Returns the current strong count of the usage guard.
pub(crate) fn usage_count(&self) -> usize {
Arc::strong_count(&self.usage_guard)
}
/// Returns the [`ExecutionCache`] belonging to the tracked hash.
pub(crate) const fn cache(&self) -> &ExecutionCache {
&self.caches

View File

@@ -6,15 +6,13 @@ use reth_errors::{BlockExecutionError, BlockValidationError, ProviderError};
use reth_evm::execute::InternalBlockExecutionError;
use reth_payload_primitives::NewPayloadError;
use reth_primitives_traits::{Block, BlockBody, SealedBlock};
use tokio::sync::oneshot::error::TryRecvError;
/// This is an error that can come from advancing persistence. Either this can be a
/// [`TryRecvError`], or this can be a [`ProviderError`]
/// This is an error that can come from advancing persistence.
#[derive(Debug, thiserror::Error)]
pub enum AdvancePersistenceError {
/// An error that can be from failing to receive a value from persistence
#[error(transparent)]
RecvError(#[from] TryRecvError),
/// The persistence channel was closed unexpectedly
#[error("persistence channel closed")]
ChannelClosed,
/// A provider error
#[error(transparent)]
Provider(#[from] ProviderError),

View File

@@ -321,7 +321,7 @@ impl NewPayloadStatusMetrics {
}
/// Metrics for non-execution related block validation.
#[derive(Metrics)]
#[derive(Metrics, Clone)]
#[metrics(scope = "sync.block_validation")]
pub(crate) struct BlockValidationMetrics {
/// Total number of storage tries updated in the state root calculation
@@ -348,6 +348,14 @@ pub(crate) struct BlockValidationMetrics {
pub(crate) post_execution_validation_duration: Histogram,
/// Total duration of the new payload call
pub(crate) total_duration: Histogram,
/// Size of `HashedPostStateSorted` (`total_len`)
pub(crate) hashed_post_state_size: Histogram,
/// Size of `TrieUpdatesSorted` (`total_len`)
pub(crate) trie_updates_sorted_size: Histogram,
/// Size of `AnchoredTrieInput` overlay `TrieUpdatesSorted` (`total_len`)
pub(crate) anchored_overlay_trie_updates_size: Histogram,
/// Size of `AnchoredTrieInput` overlay `HashedPostStateSorted` (`total_len`)
pub(crate) anchored_overlay_hashed_state_size: Histogram,
}
impl BlockValidationMetrics {

View File

@@ -37,18 +37,12 @@ use reth_revm::database::StateProviderDatabase;
use reth_stages_api::ControlFlow;
use revm::state::EvmState;
use state::TreeState;
use std::{
fmt::Debug,
ops,
sync::{
mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
Arc,
},
time::Instant,
};
use std::{fmt::Debug, ops, sync::Arc, time::Instant};
use crossbeam_channel::{Receiver, Sender};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot::{self, error::TryRecvError},
oneshot,
};
use tracing::*;
@@ -240,7 +234,7 @@ where
C: ConfigureEvm<Primitives = N> + 'static,
{
provider: P,
consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
consensus: Arc<dyn FullConsensus<N>>,
payload_validator: V,
/// Keeps track of internals such as executed and buffered blocks.
state: EngineApiTreeState<N>,
@@ -326,7 +320,7 @@ where
#[expect(clippy::too_many_arguments)]
pub fn new(
provider: P,
consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
consensus: Arc<dyn FullConsensus<N>>,
payload_validator: V,
outgoing: UnboundedSender<EngineApiEvent<N>>,
state: EngineApiTreeState<N>,
@@ -338,7 +332,7 @@ where
engine_kind: EngineApiKind,
evm_config: C,
) -> Self {
let (incoming_tx, incoming) = std::sync::mpsc::channel();
let (incoming_tx, incoming) = crossbeam_channel::unbounded();
Self {
provider,
@@ -368,7 +362,7 @@ where
#[expect(clippy::complexity)]
pub fn spawn_new(
provider: P,
consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
consensus: Arc<dyn FullConsensus<N>>,
payload_validator: V,
persistence: PersistenceHandle<N>,
payload_builder: PayloadBuilderHandle<T>,
@@ -423,8 +417,8 @@ where
/// This will block the current thread and process incoming messages.
pub fn run(mut self) {
loop {
match self.try_recv_engine_message() {
Ok(Some(msg)) => {
match self.wait_for_event() {
LoopEvent::EngineMessage(msg) => {
debug!(target: "engine::tree", %msg, "received new engine message");
match self.on_engine_message(msg) {
Ok(ops::ControlFlow::Break(())) => return,
@@ -435,15 +429,22 @@ where
}
}
}
Ok(None) => {
debug!(target: "engine::tree", "received no engine message for some time, while waiting for persistence task to complete");
LoopEvent::PersistenceComplete { result, start_time } => {
if let Err(err) = self.on_persistence_complete(result, start_time) {
error!(target: "engine::tree", %err, "Persistence complete handling failed");
return
}
}
Err(_err) => {
error!(target: "engine::tree", "Engine channel disconnected");
LoopEvent::Disconnected => {
error!(target: "engine::tree", "Channel disconnected");
return
}
}
// Always check if we need to trigger new persistence after any event:
// - After engine messages: new blocks may have been inserted that exceed the
// persistence threshold
// - After persistence completion: we can now persist more blocks if needed
if let Err(err) = self.advance_persistence() {
error!(target: "engine::tree", %err, "Advancing persistence failed");
return
@@ -451,6 +452,47 @@ where
}
}
/// Blocks until the next event is ready: either an incoming engine message or a persistence
/// completion (if one is in progress).
///
/// Uses biased selection to prioritize persistence completion to update in-memory state and
/// unblock further writes.
fn wait_for_event(&mut self) -> LoopEvent<T, N> {
// Take ownership of persistence rx if present
let maybe_persistence = self.persistence_state.rx.take();
if let Some((persistence_rx, start_time, action)) = maybe_persistence {
// Biased select prioritizes persistence completion to update in memory state and
// unblock further writes
crossbeam_channel::select_biased! {
recv(persistence_rx) -> result => {
// Don't put it back - consumed (oneshot-like behavior)
match result {
Ok(value) => LoopEvent::PersistenceComplete {
result: value,
start_time,
},
Err(_) => LoopEvent::Disconnected,
}
},
recv(self.incoming) -> msg => {
// Put the persistence rx back - we didn't consume it
self.persistence_state.rx = Some((persistence_rx, start_time, action));
match msg {
Ok(m) => LoopEvent::EngineMessage(m),
Err(_) => LoopEvent::Disconnected,
}
},
}
} else {
// No persistence in progress - just wait on incoming
match self.incoming.recv() {
Ok(m) => LoopEvent::EngineMessage(m),
Err(_) => LoopEvent::Disconnected,
}
}
}
/// Invoked when previously requested blocks were downloaded.
///
/// If the block count exceeds the configured batch size we're allowed to execute at once, this
@@ -1191,39 +1233,13 @@ where
.with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
}
/// Attempts to receive the next engine request.
///
/// If there's currently no persistence action in progress, this will block until a new request
/// is received. If there's a persistence action in progress, this will try to receive the
/// next request with a timeout to not block indefinitely and return `Ok(None)` if no request is
/// received in time.
///
/// Returns an error if the engine channel is disconnected.
#[expect(clippy::type_complexity)]
fn try_recv_engine_message(
&self,
) -> Result<Option<FromEngine<EngineApiRequest<T, N>, N::Block>>, RecvError> {
if self.persistence_state.in_progress() {
// try to receive the next request with a timeout to not block indefinitely
match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
Ok(msg) => Ok(Some(msg)),
Err(err) => match err {
RecvTimeoutError::Timeout => Ok(None),
RecvTimeoutError::Disconnected => Err(RecvError),
},
}
} else {
self.incoming.recv().map(Some)
}
}
/// Helper method to remove blocks and set the persistence state. This ensures we keep track of
/// the current persistence action while we're removing blocks.
fn remove_blocks(&mut self, new_tip_num: u64) {
debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
if new_tip_num < self.persistence_state.last_persisted_block.number {
debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
let (tx, rx) = oneshot::channel();
let (tx, rx) = crossbeam_channel::bounded(1);
let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
self.persistence_state.start_remove(new_tip_num, rx);
}
@@ -1245,35 +1261,17 @@ where
.expect("Checked non-empty persisting blocks");
debug!(target: "engine::tree", count=blocks_to_persist.len(), blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
let (tx, rx) = oneshot::channel();
let (tx, rx) = crossbeam_channel::bounded(1);
let _ = self.persistence.save_blocks(blocks_to_persist, tx);
self.persistence_state.start_save(highest_num_hash, rx);
}
/// Attempts to advance the persistence state.
/// Triggers new persistence actions if no persistence task is currently in progress.
///
/// If we're currently awaiting a response this will try to receive the response (non-blocking)
/// or send a new persistence action if necessary.
/// This checks if we need to remove blocks (disk reorg) or save new blocks to disk.
/// Persistence completion is handled separately via the `wait_for_event` method.
fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
if self.persistence_state.in_progress() {
let (mut rx, start_time, current_action) = self
.persistence_state
.rx
.take()
.expect("if a persistence task is in progress Receiver must be Some");
// Check if persistence has complete
match rx.try_recv() {
Ok(last_persisted_hash_num) => {
self.on_persistence_complete(last_persisted_hash_num, start_time)?;
}
Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
Err(TryRecvError::Empty) => {
self.persistence_state.rx = Some((rx, start_time, current_action))
}
}
}
if !self.persistence_state.in_progress() {
if let Some(new_tip_num) = self.find_disk_reorg()? {
self.remove_blocks(new_tip_num)
@@ -1306,7 +1304,7 @@ where
loop {
// Wait for any in-progress persistence to complete (blocking)
if let Some((rx, start_time, _action)) = self.persistence_state.rx.take() {
let result = rx.blocking_recv().map_err(|_| TryRecvError::Closed)?;
let result = rx.recv().map_err(|_| AdvancePersistenceError::ChannelClosed)?;
self.on_persistence_complete(result, start_time)?;
}
@@ -1322,6 +1320,31 @@ where
}
}
/// Tries to poll for a completed persistence task (non-blocking).
///
/// Returns `true` if a persistence task was completed, `false` otherwise.
#[cfg(test)]
pub fn try_poll_persistence(&mut self) -> Result<bool, AdvancePersistenceError> {
let Some((rx, start_time, action)) = self.persistence_state.rx.take() else {
return Ok(false);
};
match rx.try_recv() {
Ok(result) => {
self.on_persistence_complete(result, start_time)?;
Ok(true)
}
Err(crossbeam_channel::TryRecvError::Empty) => {
// Not ready yet, put it back
self.persistence_state.rx = Some((rx, start_time, action));
Ok(false)
}
Err(crossbeam_channel::TryRecvError::Disconnected) => {
Err(AdvancePersistenceError::ChannelClosed)
}
}
}
/// Handles a completed persistence task.
fn on_persistence_complete(
&mut self,
@@ -2848,6 +2871,26 @@ where
}
}
/// Events received in the main engine loop.
#[derive(Debug)]
enum LoopEvent<T, N>
where
N: NodePrimitives,
T: PayloadTypes,
{
/// An engine API message was received.
EngineMessage(FromEngine<EngineApiRequest<T, N>, N::Block>),
/// A persistence task completed.
PersistenceComplete {
/// The result of the persistence operation.
result: Option<BlockNumHash>,
/// When the persistence operation started.
start_time: Instant,
},
/// A channel was disconnected.
Disconnected,
}
/// Block inclusion can be valid, accepted, or invalid. Invalid blocks are returned as an error
/// variant.
///

View File

@@ -1,7 +1,7 @@
//! Configured sparse trie enum for switching between serial and parallel implementations.
use alloy_primitives::B256;
use reth_trie::{Nibbles, ProofTrieNode, TrieMasks, TrieNode};
use reth_trie::{BranchNodeMasks, Nibbles, ProofTrieNode, TrieNode};
use reth_trie_sparse::{
errors::SparseTrieResult, provider::TrieNodeProvider, LeafLookup, LeafLookupError,
SerialSparseTrie, SparseTrieInterface, SparseTrieUpdates,
@@ -44,7 +44,7 @@ impl SparseTrieInterface for ConfiguredSparseTrie {
fn with_root(
self,
root: TrieNode,
masks: TrieMasks,
masks: Option<BranchNodeMasks>,
retain_updates: bool,
) -> SparseTrieResult<Self> {
match self {
@@ -75,7 +75,7 @@ impl SparseTrieInterface for ConfiguredSparseTrie {
&mut self,
path: Nibbles,
node: TrieNode,
masks: TrieMasks,
masks: Option<BranchNodeMasks>,
) -> SparseTrieResult<()> {
match self {
Self::Serial(trie) => trie.reveal_node(path, node, masks),

View File

@@ -274,24 +274,23 @@ where
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
let storage_worker_count = config.storage_worker_count();
let account_worker_count = config.account_worker_count();
let v2_proofs_enabled = config.enable_proof_v2();
let proof_handle = ProofWorkerHandle::new(
self.executor.handle().clone(),
task_ctx,
storage_worker_count,
account_worker_count,
v2_proofs_enabled,
);
let multi_proof_task = MultiProofTask::new(
proof_handle.clone(),
to_sparse_trie,
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
to_multi_proof,
to_multi_proof.clone(),
from_multi_proof,
);
// wire the multiproof task to the prewarm task
let to_multi_proof = Some(multi_proof_task.state_root_message_sender());
// spawn multi-proof task
let parent_span = span.clone();
let saved_cache = prewarm_handle.saved_cache.clone();
@@ -316,7 +315,7 @@ where
self.spawn_sparse_trie_task(sparse_trie_rx, proof_handle, state_root_tx);
PayloadHandle {
to_multi_proof,
to_multi_proof: Some(to_multi_proof),
prewarm_handle,
state_root: Some(state_root_rx),
transactions: execution_rx,
@@ -492,38 +491,40 @@ where
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
{
// Reuse a stored SparseStateTrie, or create a new one using the desired configuration if
// there's none to reuse.
let cleared_sparse_trie = Arc::clone(&self.sparse_state_trie);
let sparse_state_trie = cleared_sparse_trie.lock().take().unwrap_or_else(|| {
let default_trie = SparseTrie::blind_from(if self.disable_parallel_sparse_trie {
ConfiguredSparseTrie::Serial(Default::default())
} else {
ConfiguredSparseTrie::Parallel(Box::new(
ParallelSparseTrie::default()
.with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
))
});
ClearedSparseStateTrie::from_state_trie(
SparseStateTrie::new()
.with_accounts_trie(default_trie.clone())
.with_default_storage_trie(default_trie)
.with_updates(true),
)
});
let task =
SparseTrieTask::<_, ConfiguredSparseTrie, ConfiguredSparseTrie>::new_with_cleared_trie(
sparse_trie_rx,
proof_worker_handle,
self.trie_metrics.clone(),
sparse_state_trie,
);
let disable_parallel_sparse_trie = self.disable_parallel_sparse_trie;
let trie_metrics = self.trie_metrics.clone();
let span = Span::current();
self.executor.spawn_blocking(move || {
let _enter = span.entered();
// Reuse a stored SparseStateTrie, or create a new one using the desired configuration
// if there's none to reuse.
let sparse_state_trie = cleared_sparse_trie.lock().take().unwrap_or_else(|| {
let default_trie = SparseTrie::blind_from(if disable_parallel_sparse_trie {
ConfiguredSparseTrie::Serial(Default::default())
} else {
ConfiguredSparseTrie::Parallel(Box::new(
ParallelSparseTrie::default()
.with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
))
});
ClearedSparseStateTrie::from_state_trie(
SparseStateTrie::new()
.with_accounts_trie(default_trie.clone())
.with_default_storage_trie(default_trie)
.with_updates(true),
)
});
let task = SparseTrieTask::<_, ConfiguredSparseTrie, ConfiguredSparseTrie>::new_with_cleared_trie(
sparse_trie_rx,
proof_worker_handle,
trie_metrics,
sparse_state_trie,
);
let (result, trie) = task.run();
// Send state root computation result
let _ = state_root_tx.send(result);
@@ -775,12 +776,34 @@ impl ExecutionCache {
warn!(blocked_for=?elapsed, "Blocked waiting for execution cache mutex");
}
cache
.as_ref()
if let Some(c) = cache.as_ref() {
let cached_hash = c.executed_block_hash();
// Check that the cache hash matches the parent hash of the current block. It won't
// match in case it's a fork block.
let hash_matches = cached_hash == parent_hash;
// Check `is_available()` to ensure no other tasks (e.g., prewarming) currently hold
// a reference to this cache. We can only reuse it when we have exclusive access.
.filter(|c| c.executed_block_hash() == parent_hash && c.is_available())
.cloned()
let available = c.is_available();
let usage_count = c.usage_count();
debug!(
target: "engine::caching",
%cached_hash,
%parent_hash,
hash_matches,
available,
usage_count,
"Existing cache found"
);
if hash_matches && available {
return Some(c.clone());
}
} else {
debug!(target: "engine::caching", %parent_hash, "No cache found");
}
None
}
/// Clears the tracked cache

File diff suppressed because it is too large Load Diff

View File

@@ -29,7 +29,6 @@ use alloy_evm::Database;
use alloy_primitives::{keccak256, map::B256Set, B256};
use crossbeam_channel::Sender as CrossbeamSender;
use metrics::{Counter, Gauge, Histogram};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, SpecFor};
use reth_execution_types::ExecutionOutcome;
use reth_metrics::Metrics;
@@ -619,8 +618,7 @@ where
let _enter =
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm outcome", index, tx_hash=%tx.tx().tx_hash())
.entered();
let targets = multiproof_targets_from_state(res.state);
let storage_targets = targets.storage_targets_count();
let (targets, storage_targets) = multiproof_targets_from_state(res.state);
metrics.prefetch_storage_targets.record(storage_targets as f64);
let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
drop(_enter);
@@ -767,33 +765,37 @@ where
/// Returns a set of [`MultiProofTargets`] and the total amount of storage targets, based on the
/// given state.
fn multiproof_targets_from_state(state: EvmState) -> MultiProofTargets {
state
.into_par_iter()
.filter_map(|(address, account)| {
// if the account was not touched, or if the account was selfdestructed, do not
// fetch proofs for it
//
// Since selfdestruct can only happen in the same transaction, we can skip
// prefetching proofs for selfdestructed accounts
//
// See: https://eips.ethereum.org/EIPS/eip-6780
if !account.is_touched() || account.is_selfdestructed() {
return None;
fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize) {
let mut targets = MultiProofTargets::with_capacity(state.len());
let mut storage_targets = 0;
for (addr, account) in state {
// if the account was not touched, or if the account was selfdestructed, do not
// fetch proofs for it
//
// Since selfdestruct can only happen in the same transaction, we can skip
// prefetching proofs for selfdestructed accounts
//
// See: https://eips.ethereum.org/EIPS/eip-6780
if !account.is_touched() || account.is_selfdestructed() {
continue
}
let mut storage_set =
B256Set::with_capacity_and_hasher(account.storage.len(), Default::default());
for (key, slot) in account.storage {
// do nothing if unchanged
if !slot.is_changed() {
continue
}
let hashed_address = keccak256(address);
storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
}
let storage_set: B256Set = account
.storage
.into_iter()
.filter(|(_, slot)| slot.is_changed())
.map(|(key, _)| keccak256(B256::new(key.to_be_bytes())))
.collect();
storage_targets += storage_set.len();
targets.insert(keccak256(addr), storage_set);
}
Some((hashed_address, storage_set))
})
.collect()
(targets, storage_targets)
}
/// The events the pre-warm task can handle.

View File

@@ -121,8 +121,9 @@ where
ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
})?;
self.metrics.sparse_trie_final_update_duration_histogram.record(start.elapsed());
self.metrics.sparse_trie_total_duration_histogram.record(now.elapsed());
let end = Instant::now();
self.metrics.sparse_trie_final_update_duration_histogram.record(end.duration_since(start));
self.metrics.sparse_trie_total_duration_histogram.record(end.duration_since(now));
Ok(StateRootComputeOutcome { state_root, trie_updates })
}
@@ -173,7 +174,7 @@ where
.par_bridge()
.map(|(address, storage, storage_trie)| {
let _enter =
debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: span.clone(), "storage trie", ?address)
debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage trie", ?address)
.entered();
trace!(target: "engine::tree::payload_processor::sparse_trie", "Updating storage");

View File

@@ -34,13 +34,12 @@ use reth_primitives_traits::{
SealedHeader, SignerRecoverable,
};
use reth_provider::{
providers::OverlayStateProviderFactory, BlockExecutionOutput, BlockReader,
DatabaseProviderFactory, DatabaseProviderROFactory, ExecutionOutcome, HashedPostStateProvider,
ProviderError, PruneCheckpointReader, StageCheckpointReader, StateProvider,
StateProviderFactory, StateReader, TrieReader,
providers::OverlayStateProviderFactory, BlockExecutionOutput, BlockNumReader, BlockReader,
ChangeSetReader, DatabaseProviderFactory, DatabaseProviderROFactory, ExecutionOutcome,
HashedPostStateProvider, ProviderError, PruneCheckpointReader, StageCheckpointReader,
StateProvider, StateProviderFactory, StateReader, TrieReader,
};
use reth_revm::db::State;
use reth_storage_errors::db::DatabaseError;
use reth_trie::{updates::TrieUpdates, HashedPostState, StateRoot, TrieInputSorted};
use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
use revm_primitives::Address;
@@ -112,7 +111,7 @@ where
/// Provider for database access.
provider: P,
/// Consensus implementation for validation.
consensus: Arc<dyn FullConsensus<Evm::Primitives, Error = ConsensusError>>,
consensus: Arc<dyn FullConsensus<Evm::Primitives>>,
/// EVM configuration.
evm_config: Evm,
/// Configuration for the tree.
@@ -136,8 +135,15 @@ impl<N, P, Evm, V> BasicEngineValidator<P, Evm, V>
where
N: NodePrimitives,
P: DatabaseProviderFactory<
Provider: BlockReader + TrieReader + StageCheckpointReader + PruneCheckpointReader,
Provider: BlockReader
+ TrieReader
+ StageCheckpointReader
+ PruneCheckpointReader
+ ChangeSetReader
+ BlockNumReader,
> + BlockReader<Header = N::BlockHeader>
+ ChangeSetReader
+ BlockNumReader
+ StateProviderFactory
+ StateReader
+ HashedPostStateProvider
@@ -149,7 +155,7 @@ where
#[allow(clippy::too_many_arguments)]
pub fn new(
provider: P,
consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
consensus: Arc<dyn FullConsensus<N>>,
evm_config: Evm,
validator: V,
config: TreeConfig,
@@ -616,7 +622,8 @@ where
.without_state_clear()
.build();
let evm = self.evm_config.evm_with_env(&mut db, env.evm_env.clone());
let spec_id = *env.evm_env.spec_id();
let evm = self.evm_config.evm_with_env(&mut db, env.evm_env);
let ctx =
self.execution_ctx_for(input).map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?;
let mut executor = self.evm_config.create_executor(evm, ctx);
@@ -632,7 +639,7 @@ where
CachedPrecompile::wrap(
precompile,
self.precompile_cache_map.cache_for_address(*address),
*env.evm_env.spec_id(),
spec_id,
Some(metrics),
)
});
@@ -713,8 +720,7 @@ where
Ok(StateRoot::new(&provider, &provider)
.with_prefix_sets(prefix_sets.freeze())
.root_with_updates()
.map_err(Into::<DatabaseError>::into)?)
.root_with_updates()?)
}
/// Validates the block after execution.
@@ -1080,16 +1086,33 @@ where
ancestors,
);
let deferred_handle_task = deferred_trie_data.clone();
let deferred_compute_duration =
self.metrics.block_validation.deferred_trie_compute_duration.clone();
let block_validation_metrics = self.metrics.block_validation.clone();
// Spawn background task to compute trie data. Calling `wait_cloned` will compute from
// the stored inputs and cache the result, so subsequent calls return immediately.
let compute_trie_input_task = move || {
let result = panic::catch_unwind(AssertUnwindSafe(|| {
let compute_start = Instant::now();
let _ = deferred_handle_task.wait_cloned();
deferred_compute_duration.record(compute_start.elapsed().as_secs_f64());
let computed = deferred_handle_task.wait_cloned();
block_validation_metrics
.deferred_trie_compute_duration
.record(compute_start.elapsed().as_secs_f64());
// Record sizes of the computed trie data
block_validation_metrics
.hashed_post_state_size
.record(computed.hashed_state.total_len() as f64);
block_validation_metrics
.trie_updates_sorted_size
.record(computed.trie_updates.total_len() as f64);
if let Some(anchored) = &computed.anchored_trie_input {
block_validation_metrics
.anchored_overlay_trie_updates_size
.record(anchored.trie_input.nodes.total_len() as f64);
block_validation_metrics
.anchored_overlay_hashed_state_size
.record(anchored.trie_input.state.total_len() as f64);
}
}));
if result.is_err() {
@@ -1185,10 +1208,17 @@ pub trait EngineValidator<
impl<N, Types, P, Evm, V> EngineValidator<Types> for BasicEngineValidator<P, Evm, V>
where
P: DatabaseProviderFactory<
Provider: BlockReader + TrieReader + StageCheckpointReader + PruneCheckpointReader,
Provider: BlockReader
+ TrieReader
+ StageCheckpointReader
+ PruneCheckpointReader
+ ChangeSetReader
+ BlockNumReader,
> + BlockReader<Header = N::BlockHeader>
+ StateProviderFactory
+ StateReader
+ ChangeSetReader
+ BlockNumReader
+ HashedPostStateProvider
+ Clone
+ 'static,

View File

@@ -22,12 +22,12 @@
use alloy_eips::BlockNumHash;
use alloy_primitives::B256;
use crossbeam_channel::Receiver as CrossbeamReceiver;
use std::time::Instant;
use tokio::sync::oneshot;
use tracing::trace;
/// The state of the persistence task.
#[derive(Default, Debug)]
#[derive(Debug)]
pub struct PersistenceState {
/// Hash and number of the last block persisted.
///
@@ -36,7 +36,7 @@ pub struct PersistenceState {
/// Receiver end of channel where the result of the persistence task will be
/// sent when done. A None value means there's no persistence task in progress.
pub(crate) rx:
Option<(oneshot::Receiver<Option<BlockNumHash>>, Instant, CurrentPersistenceAction)>,
Option<(CrossbeamReceiver<Option<BlockNumHash>>, Instant, CurrentPersistenceAction)>,
}
impl PersistenceState {
@@ -50,7 +50,7 @@ impl PersistenceState {
pub(crate) fn start_remove(
&mut self,
new_tip_num: u64,
rx: oneshot::Receiver<Option<BlockNumHash>>,
rx: CrossbeamReceiver<Option<BlockNumHash>>,
) {
self.rx =
Some((rx, Instant::now(), CurrentPersistenceAction::RemovingBlocks { new_tip_num }));
@@ -60,7 +60,7 @@ impl PersistenceState {
pub(crate) fn start_save(
&mut self,
highest: BlockNumHash,
rx: oneshot::Receiver<Option<BlockNumHash>>,
rx: CrossbeamReceiver<Option<BlockNumHash>>,
) {
self.rx = Some((rx, Instant::now(), CurrentPersistenceAction::SavingBlocks { highest }));
}

View File

@@ -31,7 +31,7 @@ use std::{
collections::BTreeMap,
str::FromStr,
sync::{
mpsc::{channel, Receiver, Sender},
mpsc::{Receiver, Sender},
Arc,
},
};
@@ -97,6 +97,7 @@ struct TestChannel<T> {
impl<T: Send + 'static> TestChannel<T> {
/// Creates a new test channel
fn spawn_channel() -> (Sender<T>, Receiver<T>, TestChannelHandle) {
use std::sync::mpsc::channel;
let (original_tx, original_rx) = channel();
let (wrapped_tx, wrapped_rx) = channel();
let (release_tx, release_rx) = channel();
@@ -143,7 +144,9 @@ struct TestHarness {
BasicEngineValidator<MockEthProvider, MockEvmConfig, MockEngineValidator>,
MockEvmConfig,
>,
to_tree_tx: Sender<FromEngine<EngineApiRequest<EthEngineTypes, EthPrimitives>, Block>>,
to_tree_tx: crossbeam_channel::Sender<
FromEngine<EngineApiRequest<EthEngineTypes, EthPrimitives>, Block>,
>,
from_tree_rx: UnboundedReceiver<EngineApiEvent>,
blocks: Vec<ExecutedBlock>,
action_rx: Receiver<PersistenceAction>,
@@ -153,6 +156,7 @@ struct TestHarness {
impl TestHarness {
fn new(chain_spec: Arc<ChainSpec>) -> Self {
use std::sync::mpsc::channel;
let (action_tx, action_rx) = channel();
Self::with_persistence_channel(chain_spec, action_tx, action_rx)
}
@@ -205,7 +209,7 @@ impl TestHarness {
engine_api_tree_state,
canonical_in_memory_state,
persistence_handle,
PersistenceState::default(),
PersistenceState { last_persisted_block: BlockNumHash::default(), rx: None },
payload_builder,
// always assume enough parallelism for tests
TreeConfig::default().with_legacy_state_root(false).with_has_enough_parallelism(true),
@@ -399,10 +403,8 @@ impl ValidatorTestHarness {
/// Configure `PersistenceState` for specific persistence scenarios
fn start_persistence_operation(&mut self, action: CurrentPersistenceAction) {
use tokio::sync::oneshot;
// Create a dummy receiver for testing - it will never receive a value
let (_tx, rx) = oneshot::channel();
let (_tx, rx) = crossbeam_channel::bounded(1);
match action {
CurrentPersistenceAction::SavingBlocks { highest } => {
@@ -498,11 +500,17 @@ fn test_tree_persist_block_batch() {
test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(blocks)).unwrap();
// process the message
let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
let msg = match test_harness.tree.wait_for_event() {
super::LoopEvent::EngineMessage(msg) => msg,
other => panic!("unexpected event: {other:?}"),
};
let _ = test_harness.tree.on_engine_message(msg).unwrap();
// we now should receive the other batch
let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
let msg = match test_harness.tree.wait_for_event() {
super::LoopEvent::EngineMessage(msg) => msg,
other => panic!("unexpected event: {other:?}"),
};
match msg {
FromEngine::DownloadedBlocks(blocks) => {
assert_eq!(blocks.len(), tree_config.max_execute_block_batch_size());
@@ -753,8 +761,8 @@ async fn test_tree_state_on_new_head_reorg() {
})
);
// after advancing persistence, we should be at `None` for the next action
test_harness.tree.advance_persistence().unwrap();
// after polling persistence completion, we should be at `None` for the next action
test_harness.tree.try_poll_persistence().unwrap();
let current_action = test_harness.tree.persistence_state.current_action().cloned();
assert_eq!(current_action, None);

View File

@@ -51,7 +51,12 @@ jemalloc = [
"reth-node-metrics/jemalloc",
]
jemalloc-prof = [
"reth-node-core/jemalloc",
"jemalloc",
"reth-node-metrics/jemalloc-prof",
]
jemalloc-symbols = [
"jemalloc-prof",
"reth-node-metrics/jemalloc-symbols",
]
tracy-allocator = []
@@ -81,3 +86,5 @@ min-trace-logs = [
"tracing/release_max_level_trace",
"reth-node-core/min-trace-logs",
]
edge = ["reth-cli-commands/edge"]

View File

@@ -84,17 +84,15 @@ where
B: Block,
ChainSpec: EthChainSpec<Header = B::Header> + EthereumHardforks + Debug + Send + Sync,
{
type Error = ConsensusError;
fn validate_body_against_header(
&self,
body: &B::Body,
header: &SealedHeader<B::Header>,
) -> Result<(), Self::Error> {
) -> Result<(), ConsensusError> {
validate_body_against_header(body, header.header())
}
fn validate_block_pre_execution(&self, block: &SealedBlock<B>) -> Result<(), Self::Error> {
fn validate_block_pre_execution(&self, block: &SealedBlock<B>) -> Result<(), ConsensusError> {
validate_block_pre_execution(block, &self.chain_spec)
}
}
@@ -228,10 +226,12 @@ mod tests {
let parent = header_with_gas_limit(GAS_LIMIT_BOUND_DIVISOR * 10);
let child = header_with_gas_limit((parent.gas_limit + 5) as u64);
assert_eq!(
validate_against_parent_gas_limit(&child, &parent, &ChainSpec::<Header>::default()),
Ok(())
);
assert!(validate_against_parent_gas_limit(
&child,
&parent,
&ChainSpec::<Header>::default()
)
.is_ok());
}
#[test]
@@ -239,10 +239,11 @@ mod tests {
let parent = header_with_gas_limit(MINIMUM_GAS_LIMIT);
let child = header_with_gas_limit(MINIMUM_GAS_LIMIT - 1);
assert_eq!(
validate_against_parent_gas_limit(&child, &parent, &ChainSpec::<Header>::default()),
Err(ConsensusError::GasLimitInvalidMinimum { child_gas_limit: child.gas_limit as u64 })
);
assert!(matches!(
validate_against_parent_gas_limit(&child, &parent, &ChainSpec::<Header>::default()).unwrap_err(),
ConsensusError::GasLimitInvalidMinimum { child_gas_limit }
if child_gas_limit == child.gas_limit as u64
));
}
#[test]
@@ -252,13 +253,11 @@ mod tests {
parent.gas_limit + parent.gas_limit / GAS_LIMIT_BOUND_DIVISOR + 1,
);
assert_eq!(
validate_against_parent_gas_limit(&child, &parent, &ChainSpec::<Header>::default()),
Err(ConsensusError::GasLimitInvalidIncrease {
parent_gas_limit: parent.gas_limit,
child_gas_limit: child.gas_limit,
})
);
assert!(matches!(
validate_against_parent_gas_limit(&child, &parent, &ChainSpec::<Header>::default()).unwrap_err(),
ConsensusError::GasLimitInvalidIncrease { parent_gas_limit, child_gas_limit }
if parent_gas_limit == parent.gas_limit && child_gas_limit == child.gas_limit
));
}
#[test]
@@ -266,10 +265,12 @@ mod tests {
let parent = header_with_gas_limit(GAS_LIMIT_BOUND_DIVISOR * 10);
let child = header_with_gas_limit(parent.gas_limit - 5);
assert_eq!(
validate_against_parent_gas_limit(&child, &parent, &ChainSpec::<Header>::default()),
Ok(())
);
assert!(validate_against_parent_gas_limit(
&child,
&parent,
&ChainSpec::<Header>::default()
)
.is_ok());
}
#[test]
@@ -279,13 +280,11 @@ mod tests {
parent.gas_limit - parent.gas_limit / GAS_LIMIT_BOUND_DIVISOR - 1,
);
assert_eq!(
validate_against_parent_gas_limit(&child, &parent, &ChainSpec::<Header>::default()),
Err(ConsensusError::GasLimitInvalidDecrease {
parent_gas_limit: parent.gas_limit,
child_gas_limit: child.gas_limit,
})
);
assert!(matches!(
validate_against_parent_gas_limit(&child, &parent, &ChainSpec::<Header>::default()).unwrap_err(),
ConsensusError::GasLimitInvalidDecrease { parent_gas_limit, child_gas_limit }
if parent_gas_limit == parent.gas_limit && child_gas_limit == child.gas_limit
));
}
#[test]
@@ -300,9 +299,8 @@ mod tests {
..Default::default()
};
assert_eq!(
EthBeaconConsensus::new(chain_spec).validate_header(&SealedHeader::seal_slow(header,)),
Ok(())
);
assert!(EthBeaconConsensus::new(chain_spec)
.validate_header(&SealedHeader::seal_slow(header,))
.is_ok());
}
}

View File

@@ -170,18 +170,16 @@ mod tests {
let expected_receipts_root = B256::random();
let expected_logs_bloom = calculated_logs_bloom;
assert_eq!(
assert!(matches!(
compare_receipts_root_and_logs_bloom(
calculated_receipts_root,
calculated_logs_bloom,
expected_receipts_root,
expected_logs_bloom
),
Err(ConsensusError::BodyReceiptRootDiff(
GotExpected { got: calculated_receipts_root, expected: expected_receipts_root }
.into()
))
);
).unwrap_err(),
ConsensusError::BodyReceiptRootDiff(diff)
if diff.got == calculated_receipts_root && diff.expected == expected_receipts_root
));
}
#[test]
@@ -192,16 +190,15 @@ mod tests {
let expected_receipts_root = calculated_receipts_root;
let expected_logs_bloom = Bloom::random();
assert_eq!(
assert!(matches!(
compare_receipts_root_and_logs_bloom(
calculated_receipts_root,
calculated_logs_bloom,
expected_receipts_root,
expected_logs_bloom
),
Err(ConsensusError::BodyBloomLogDiff(
GotExpected { got: calculated_logs_bloom, expected: expected_logs_bloom }.into()
))
);
).unwrap_err(),
ConsensusError::BodyBloomLogDiff(diff)
if diff.got == calculated_logs_bloom && diff.expected == expected_logs_bloom
));
}
}

View File

@@ -24,8 +24,10 @@ use crate::BuiltPayloadConversionError;
/// Contains the built payload.
///
/// According to the [engine API specification](https://github.com/ethereum/execution-apis/blob/main/src/engine/README.md) the execution layer should build the initial version of the payload with an empty transaction set and then keep update it in order to maximize the revenue.
/// Therefore, the empty-block here is always available and full-block will be set/updated
/// afterward.
///
/// This struct represents a single built block at a point in time. The payload building process
/// creates a sequence of these payloads, starting with an empty block and progressively including
/// more transactions.
#[derive(Debug, Clone)]
pub struct EthBuiltPayload<N: NodePrimitives = EthPrimitives> {
/// Identifier of the payload

View File

@@ -29,6 +29,7 @@ async fn testing_rpc_build_block_works() -> eyre::Result<()> {
.expect("valid datadir"),
static_files_path: Some(tempdir.path().join("static")),
rocksdb_path: Some(tempdir.path().join("rocksdb")),
pprof_dumps_path: Some(tempdir.path().join("pprof")),
};
let config = NodeConfig::test().with_datadir_args(datadir_args).with_rpc(rpc_args);
let db = create_test_rw_db();

View File

@@ -152,6 +152,15 @@ jemalloc = [
"reth-ethereum-cli?/jemalloc",
"reth-node-core?/jemalloc",
]
jemalloc-prof = [
"jemalloc",
"reth-cli-util?/jemalloc-prof",
"reth-ethereum-cli?/jemalloc-prof",
]
jemalloc-symbols = [
"jemalloc-prof",
"reth-ethereum-cli?/jemalloc-symbols",
]
js-tracer = [
"rpc",
"reth-rpc/js-tracer",

View File

@@ -7,7 +7,7 @@ use reth_storage_errors::{db::DatabaseError, provider::ProviderError};
use thiserror::Error;
/// State root errors.
#[derive(Error, PartialEq, Eq, Clone, Debug)]
#[derive(Error, Clone, Debug)]
pub enum StateRootError {
/// Internal database error.
#[error(transparent)]
@@ -15,19 +15,25 @@ pub enum StateRootError {
/// Storage root error.
#[error(transparent)]
StorageRootError(#[from] StorageRootError),
/// Provider error when loading prefix sets
#[error(transparent)]
PrefixSetLoadError(#[from] ProviderError),
}
impl From<StateRootError> for DatabaseError {
fn from(err: StateRootError) -> Self {
match err {
impl From<StateRootError> for ProviderError {
fn from(value: StateRootError) -> Self {
match value {
StateRootError::Database(err) |
StateRootError::StorageRootError(StorageRootError::Database(err)) => err,
StateRootError::StorageRootError(StorageRootError::Database(err)) => {
Self::Database(err)
}
StateRootError::PrefixSetLoadError(err) => err,
}
}
}
/// Storage root error.
#[derive(Error, PartialEq, Eq, Clone, Debug)]
#[derive(Error, Clone, Debug)]
pub enum StorageRootError {
/// Internal database error.
#[error(transparent)]
@@ -43,7 +49,7 @@ impl From<StorageRootError> for DatabaseError {
}
/// State proof errors.
#[derive(Error, PartialEq, Eq, Clone, Debug)]
#[derive(Error, Clone, Debug)]
pub enum StateProofError {
/// Internal database error.
#[error(transparent)]

View File

@@ -1,7 +1,7 @@
//! Contains [Chain], a chain of blocks and their final state.
use crate::ExecutionOutcome;
use alloc::{borrow::Cow, collections::BTreeMap, vec::Vec};
use alloc::{borrow::Cow, collections::BTreeMap, sync::Arc, vec::Vec};
use alloy_consensus::{transaction::Recovered, BlockHeader};
use alloy_eips::{eip1898::ForkBlock, eip2718::Encodable2718, BlockNumHash};
use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash};
@@ -10,8 +10,7 @@ use reth_primitives_traits::{
transaction::signed::SignedTransaction, Block, BlockBody, IndexedTx, NodePrimitives,
RecoveredBlock, SealedHeader,
};
use reth_trie_common::updates::TrieUpdates;
use revm::database::BundleState;
use reth_trie_common::{updates::TrieUpdatesSorted, HashedPostStateSorted};
/// A chain of blocks and their final state.
///
@@ -35,10 +34,10 @@ pub struct Chain<N: NodePrimitives = reth_ethereum_primitives::EthPrimitives> {
///
/// Additionally, it includes the individual state changes that led to the current state.
execution_outcome: ExecutionOutcome<N::Receipt>,
/// State trie updates after block is added to the chain.
/// NOTE: Currently, trie updates are present only for
/// single-block chains that extend the canonical chain.
trie_updates: Option<TrieUpdates>,
/// State trie updates for each block in the chain, keyed by block number.
trie_updates: BTreeMap<BlockNumber, Arc<TrieUpdatesSorted>>,
/// Hashed post state for each block in the chain, keyed by block number.
hashed_state: BTreeMap<BlockNumber, Arc<HashedPostStateSorted>>,
}
type ChainTxReceiptMeta<'a, N> = (
@@ -54,6 +53,7 @@ impl<N: NodePrimitives> Default for Chain<N> {
blocks: Default::default(),
execution_outcome: Default::default(),
trie_updates: Default::default(),
hashed_state: Default::default(),
}
}
}
@@ -67,22 +67,27 @@ impl<N: NodePrimitives> Chain<N> {
pub fn new(
blocks: impl IntoIterator<Item = RecoveredBlock<N::Block>>,
execution_outcome: ExecutionOutcome<N::Receipt>,
trie_updates: Option<TrieUpdates>,
trie_updates: BTreeMap<BlockNumber, Arc<TrieUpdatesSorted>>,
hashed_state: BTreeMap<BlockNumber, Arc<HashedPostStateSorted>>,
) -> Self {
let blocks =
blocks.into_iter().map(|b| (b.header().number(), b)).collect::<BTreeMap<_, _>>();
debug_assert!(!blocks.is_empty(), "Chain should have at least one block");
Self { blocks, execution_outcome, trie_updates }
Self { blocks, execution_outcome, trie_updates, hashed_state }
}
/// Create new Chain from a single block and its state.
pub fn from_block(
block: RecoveredBlock<N::Block>,
execution_outcome: ExecutionOutcome<N::Receipt>,
trie_updates: Option<TrieUpdates>,
trie_updates: Arc<TrieUpdatesSorted>,
hashed_state: Arc<HashedPostStateSorted>,
) -> Self {
Self::new([block], execution_outcome, trie_updates)
let block_number = block.header().number();
let trie_updates_map = BTreeMap::from([(block_number, trie_updates)]);
let hashed_state_map = BTreeMap::from([(block_number, hashed_state)]);
Self::new([block], execution_outcome, trie_updates_map, hashed_state_map)
}
/// Get the blocks in this chain.
@@ -100,14 +105,37 @@ impl<N: NodePrimitives> Chain<N> {
self.blocks.values().map(|block| block.clone_sealed_header())
}
/// Get cached trie updates for this chain.
pub const fn trie_updates(&self) -> Option<&TrieUpdates> {
self.trie_updates.as_ref()
/// Get all trie updates for this chain.
pub const fn trie_updates(&self) -> &BTreeMap<BlockNumber, Arc<TrieUpdatesSorted>> {
&self.trie_updates
}
/// Remove cached trie updates for this chain.
/// Get trie updates for a specific block number.
pub fn trie_updates_at(&self, block_number: BlockNumber) -> Option<&Arc<TrieUpdatesSorted>> {
self.trie_updates.get(&block_number)
}
/// Remove all trie updates for this chain.
pub fn clear_trie_updates(&mut self) {
self.trie_updates.take();
self.trie_updates.clear();
}
/// Get all hashed states for this chain.
pub const fn hashed_state(&self) -> &BTreeMap<BlockNumber, Arc<HashedPostStateSorted>> {
&self.hashed_state
}
/// Get hashed state for a specific block number.
pub fn hashed_state_at(
&self,
block_number: BlockNumber,
) -> Option<&Arc<HashedPostStateSorted>> {
self.hashed_state.get(&block_number)
}
/// Remove all hashed states for this chain.
pub fn clear_hashed_state(&mut self) {
self.hashed_state.clear();
}
/// Get execution outcome of this chain
@@ -120,12 +148,6 @@ impl<N: NodePrimitives> Chain<N> {
&mut self.execution_outcome
}
/// Prepends the given state to the current state.
pub fn prepend_state(&mut self, state: BundleState) {
self.execution_outcome.prepend_state(state);
self.trie_updates.take(); // invalidate cached trie updates
}
/// Return true if chain is empty and has no blocks.
pub fn is_empty(&self) -> bool {
self.blocks.is_empty()
@@ -161,11 +183,23 @@ impl<N: NodePrimitives> Chain<N> {
/// Destructure the chain into its inner components:
/// 1. The blocks contained in the chain.
/// 2. The execution outcome representing the final state.
/// 3. The optional trie updates.
/// 3. The trie updates map.
/// 4. The hashed state map.
#[allow(clippy::type_complexity)]
pub fn into_inner(
self,
) -> (ChainBlocks<'static, N::Block>, ExecutionOutcome<N::Receipt>, Option<TrieUpdates>) {
(ChainBlocks { blocks: Cow::Owned(self.blocks) }, self.execution_outcome, self.trie_updates)
) -> (
ChainBlocks<'static, N::Block>,
ExecutionOutcome<N::Receipt>,
BTreeMap<BlockNumber, Arc<TrieUpdatesSorted>>,
BTreeMap<BlockNumber, Arc<HashedPostStateSorted>>,
) {
(
ChainBlocks { blocks: Cow::Owned(self.blocks) },
self.execution_outcome,
self.trie_updates,
self.hashed_state,
)
}
/// Destructure the chain into its inner components:
@@ -295,10 +329,14 @@ impl<N: NodePrimitives> Chain<N> {
&mut self,
block: RecoveredBlock<N::Block>,
execution_outcome: ExecutionOutcome<N::Receipt>,
trie_updates: Arc<TrieUpdatesSorted>,
hashed_state: Arc<HashedPostStateSorted>,
) {
self.blocks.insert(block.header().number(), block);
let block_number = block.header().number();
self.blocks.insert(block_number, block);
self.execution_outcome.extend(execution_outcome);
self.trie_updates.take(); // reset
self.trie_updates.insert(block_number, trie_updates);
self.hashed_state.insert(block_number, hashed_state);
}
/// Merge two chains by appending the given chain into the current one.
@@ -317,7 +355,8 @@ impl<N: NodePrimitives> Chain<N> {
// Insert blocks from other chain
self.blocks.extend(other.blocks);
self.execution_outcome.extend(other.execution_outcome);
self.trie_updates.take(); // reset
self.trie_updates.extend(other.trie_updates);
self.hashed_state.extend(other.hashed_state);
Ok(())
}
@@ -442,14 +481,13 @@ pub struct BlockReceipts<T = reth_ethereum_primitives::Receipt> {
#[cfg(feature = "serde-bincode-compat")]
pub(super) mod serde_bincode_compat {
use crate::{serde_bincode_compat, ExecutionOutcome};
use alloc::{borrow::Cow, collections::BTreeMap};
use alloc::{borrow::Cow, collections::BTreeMap, sync::Arc};
use alloy_primitives::BlockNumber;
use reth_ethereum_primitives::EthPrimitives;
use reth_primitives_traits::{
serde_bincode_compat::{RecoveredBlock, SerdeBincodeCompat},
Block, NodePrimitives,
};
use reth_trie_common::serde_bincode_compat::updates::TrieUpdates;
use serde::{ser::SerializeMap, Deserialize, Deserializer, Serialize, Serializer};
use serde_with::{DeserializeAs, SerializeAs};
@@ -469,6 +507,7 @@ pub(super) mod serde_bincode_compat {
/// }
/// ```
#[derive(Debug, Serialize, Deserialize)]
#[serde(bound = "")]
pub struct Chain<'a, N = EthPrimitives>
where
N: NodePrimitives<
@@ -477,7 +516,19 @@ pub(super) mod serde_bincode_compat {
{
blocks: RecoveredBlocks<'a, N::Block>,
execution_outcome: serde_bincode_compat::ExecutionOutcome<'a, N::Receipt>,
trie_updates: Option<TrieUpdates<'a>>,
#[serde(default, rename = "trie_updates_legacy")]
_trie_updates_legacy:
Option<reth_trie_common::serde_bincode_compat::updates::TrieUpdates<'a>>,
#[serde(default)]
trie_updates: BTreeMap<
BlockNumber,
reth_trie_common::serde_bincode_compat::updates::TrieUpdatesSorted<'a>,
>,
#[serde(default)]
hashed_state: BTreeMap<
BlockNumber,
reth_trie_common::serde_bincode_compat::hashed_state::HashedPostStateSorted<'a>,
>,
}
#[derive(Debug)]
@@ -530,7 +581,17 @@ pub(super) mod serde_bincode_compat {
Self {
blocks: RecoveredBlocks(Cow::Borrowed(&value.blocks)),
execution_outcome: value.execution_outcome.as_repr(),
trie_updates: value.trie_updates.as_ref().map(Into::into),
_trie_updates_legacy: None,
trie_updates: value
.trie_updates
.iter()
.map(|(k, v)| (*k, v.as_ref().into()))
.collect(),
hashed_state: value
.hashed_state
.iter()
.map(|(k, v)| (*k, v.as_ref().into()))
.collect(),
}
}
}
@@ -545,7 +606,16 @@ pub(super) mod serde_bincode_compat {
Self {
blocks: value.blocks.0.into_owned(),
execution_outcome: ExecutionOutcome::from_repr(value.execution_outcome),
trie_updates: value.trie_updates.map(Into::into),
trie_updates: value
.trie_updates
.into_iter()
.map(|(k, v)| (k, Arc::new(v.into())))
.collect(),
hashed_state: value
.hashed_state
.into_iter()
.map(|(k, v)| (k, Arc::new(v.into())))
.collect(),
}
}
}
@@ -589,6 +659,8 @@ pub(super) mod serde_bincode_compat {
#[test]
fn test_chain_bincode_roundtrip() {
use alloc::collections::BTreeMap;
#[serde_as]
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
struct Data {
@@ -603,7 +675,8 @@ pub(super) mod serde_bincode_compat {
vec![RecoveredBlock::arbitrary(&mut arbitrary::Unstructured::new(&bytes))
.unwrap()],
Default::default(),
None,
BTreeMap::new(),
BTreeMap::new(),
),
};
@@ -620,7 +693,7 @@ mod tests {
use alloy_consensus::TxType;
use alloy_primitives::{Address, B256};
use reth_ethereum_primitives::Receipt;
use revm::{primitives::HashMap, state::AccountInfo};
use revm::{database::BundleState, primitives::HashMap, state::AccountInfo};
#[test]
fn chain_append() {
@@ -703,8 +776,12 @@ mod tests {
let mut block_state_extended = execution_outcome1;
block_state_extended.extend(execution_outcome2);
let chain: Chain =
Chain::new(vec![block1.clone(), block2.clone()], block_state_extended, None);
let chain: Chain = Chain::new(
vec![block1.clone(), block2.clone()],
block_state_extended,
BTreeMap::new(),
BTreeMap::new(),
);
// return tip state
assert_eq!(

View File

@@ -396,7 +396,7 @@ impl ExecutionOutcome {
/// Returns the ethereum receipt root for all recorded receipts.
///
/// Note: this function calculated Bloom filters for every receipt and created merkle trees
/// of receipt. This is a expensive operation.
/// of receipt. This is an expensive operation.
pub fn ethereum_receipts_root(&self, block_number: BlockNumber) -> Option<B256> {
self.generic_receipts_root_slow(
block_number,

View File

@@ -57,6 +57,7 @@ reth-evm-ethereum.workspace = true
reth-primitives-traits = { workspace = true, features = ["test-utils"] }
reth-provider = { workspace = true, features = ["test-utils"] }
reth-testing-utils.workspace = true
reth-trie-common = { workspace = true, features = ["serde-bincode-compat"] }
alloy-genesis.workspace = true
@@ -76,6 +77,7 @@ serde = [
"rand/serde",
"secp256k1/serde",
"reth-primitives-traits/serde",
"reth-trie-common/serde",
"reth-prune-types/serde",
"reth-config/serde",
"reth-ethereum-primitives/serde",

View File

@@ -1,6 +1,7 @@
use crate::StreamBackfillJob;
use reth_evm::ConfigureEvm;
use std::{
collections::BTreeMap,
ops::RangeInclusive,
time::{Duration, Instant},
};
@@ -148,7 +149,7 @@ where
executor.into_state().take_bundle(),
results,
);
let chain = Chain::new(blocks, outcome, None);
let chain = Chain::new(blocks, outcome, BTreeMap::new(), BTreeMap::new());
Ok(chain)
}
}

View File

@@ -38,11 +38,14 @@ use tokio_util::sync::{PollSendError, PollSender, ReusableBoxFuture};
/// or 17 minutes of 1-second blocks.
pub const DEFAULT_EXEX_MANAGER_CAPACITY: usize = 1024;
/// The maximum number of blocks allowed in the WAL before emitting a warning.
/// Default maximum number of blocks allowed in the WAL before emitting a warning.
///
/// This constant defines the threshold for the Write-Ahead Log (WAL) size. If the number of blocks
/// in the WAL exceeds this limit, a warning is logged to indicate potential issues.
pub const WAL_BLOCKS_WARNING: usize = 128;
/// This constant defines the default threshold for the Write-Ahead Log (WAL) size. If the number
/// of blocks in the WAL exceeds this limit, a warning is logged to indicate potential issues.
///
/// This value is appropriate for Ethereum mainnet with ~12 second block times. For L2 chains with
/// faster block times, this value should be increased proportionally to avoid excessive warnings.
pub const DEFAULT_WAL_BLOCKS_WARNING: usize = 128;
/// The source of the notification.
///
@@ -247,6 +250,8 @@ pub struct ExExManager<P, N: NodePrimitives> {
wal: Wal<N>,
/// A stream of finalized headers.
finalized_header_stream: ForkChoiceStream<SealedHeader<N::BlockHeader>>,
/// The threshold for the number of blocks in the WAL before emitting a warning.
wal_blocks_warning: usize,
/// A handle to the `ExEx` manager.
handle: ExExManagerHandle<N>,
@@ -306,6 +311,7 @@ where
wal,
finalized_header_stream,
wal_blocks_warning: DEFAULT_WAL_BLOCKS_WARNING,
handle: ExExManagerHandle {
exex_tx: handle_tx,
@@ -324,6 +330,16 @@ where
self.handle.clone()
}
/// Sets the threshold for the number of blocks in the WAL before emitting a warning.
///
/// For L2 chains with faster block times, this value should be increased proportionally
/// to avoid excessive warnings. For example, a chain with 2-second block times might use
/// a value 6x higher than the default.
pub const fn with_wal_blocks_warning(mut self, threshold: usize) -> Self {
self.wal_blocks_warning = threshold;
self
}
/// Updates the current buffer capacity and notifies all `is_ready` watchers of the manager's
/// readiness to receive notifications.
fn update_capacity(&self) {
@@ -390,10 +406,11 @@ where
.unwrap();
self.wal.finalize(lowest_finished_height)?;
if self.wal.num_blocks() > WAL_BLOCKS_WARNING {
if self.wal.num_blocks() > self.wal_blocks_warning {
warn!(
target: "exex::manager",
blocks = ?self.wal.num_blocks(),
threshold = self.wal_blocks_warning,
"WAL contains too many blocks and is not getting cleared. That will lead to increased disk space usage. Check that you emit the FinishedHeight event from your ExExes."
);
}
@@ -670,6 +687,7 @@ mod tests {
BlockWriter, Chain, DBProvider, DatabaseProviderFactory, TransactionVariant,
};
use reth_testing_utils::generators::{self, random_block, BlockParams};
use std::collections::BTreeMap;
fn empty_finalized_header_stream() -> ForkChoiceStream<SealedHeader> {
let (tx, rx) = watch::channel(None);
@@ -771,7 +789,12 @@ mod tests {
block1.set_block_number(10);
let notification1 = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
new: Arc::new(Chain::new(
vec![block1.clone()],
Default::default(),
Default::default(),
Default::default(),
)),
};
// Push the first notification
@@ -789,7 +812,12 @@ mod tests {
block2.set_block_number(20);
let notification2 = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block2.clone()], Default::default(), Default::default())),
new: Arc::new(Chain::new(
vec![block2.clone()],
Default::default(),
Default::default(),
Default::default(),
)),
};
exex_manager.push_notification(notification2.clone());
@@ -832,7 +860,12 @@ mod tests {
block1.set_block_number(10);
let notification1 = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
new: Arc::new(Chain::new(
vec![block1.clone()],
Default::default(),
Default::default(),
Default::default(),
)),
};
exex_manager.push_notification(notification1.clone());
@@ -1060,6 +1093,7 @@ mod tests {
vec![Default::default()],
Default::default(),
Default::default(),
Default::default(),
)),
};
@@ -1127,7 +1161,8 @@ mod tests {
// Setup a notification
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![block1.clone(), block2.clone()],
vec![Default::default()],
Default::default(),
Default::default(),
Default::default(),
)),
@@ -1174,7 +1209,12 @@ mod tests {
block1.set_block_number(10);
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
new: Arc::new(Chain::new(
vec![block1.clone()],
Default::default(),
Default::default(),
Default::default(),
)),
};
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
@@ -1320,10 +1360,20 @@ mod tests {
);
let genesis_notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![genesis_block.clone()], Default::default(), None)),
new: Arc::new(Chain::new(
vec![genesis_block.clone()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
};
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block.clone()], Default::default(), None)),
new: Arc::new(Chain::new(
vec![block.clone()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
};
let (finalized_headers_tx, rx) = watch::channel(None);

View File

@@ -460,6 +460,7 @@ mod tests {
Chain, DBProvider, DatabaseProviderFactory,
};
use reth_testing_utils::generators::{self, random_block, BlockParams};
use std::collections::BTreeMap;
use tokio::sync::mpsc;
#[tokio::test]
@@ -499,7 +500,8 @@ mod tests {
)
.try_recover()?],
Default::default(),
None,
BTreeMap::new(),
BTreeMap::new(),
)),
};
@@ -567,7 +569,8 @@ mod tests {
.seal_slow()
.try_recover()?],
Default::default(),
None,
BTreeMap::new(),
BTreeMap::new(),
)),
};
@@ -634,7 +637,8 @@ mod tests {
new: Arc::new(Chain::new(
vec![exex_head_block.clone().try_recover()?],
Default::default(),
None,
BTreeMap::new(),
BTreeMap::new(),
)),
};
wal.commit(&exex_head_notification)?;
@@ -648,7 +652,8 @@ mod tests {
)
.try_recover()?],
Default::default(),
None,
BTreeMap::new(),
BTreeMap::new(),
)),
};
@@ -705,7 +710,8 @@ mod tests {
new: Arc::new(Chain::new(
vec![exex_head_block.clone().try_recover()?],
Default::default(),
None,
BTreeMap::new(),
BTreeMap::new(),
)),
};
wal.commit(&exex_head_notification)?;
@@ -724,7 +730,8 @@ mod tests {
)
.try_recover()?],
Default::default(),
None,
BTreeMap::new(),
BTreeMap::new(),
)),
};

View File

@@ -243,7 +243,7 @@ mod tests {
use reth_testing_utils::generators::{
self, random_block, random_block_range, BlockParams, BlockRangeParams,
};
use std::sync::Arc;
use std::{collections::BTreeMap, sync::Arc};
fn read_notifications(wal: &Wal) -> WalResult<Vec<ExExNotification>> {
wal.inner.storage.files_range()?.map_or(Ok(Vec::new()), |range| {
@@ -303,25 +303,38 @@ mod tests {
new: Arc::new(Chain::new(
vec![blocks[0].clone(), blocks[1].clone()],
Default::default(),
None,
BTreeMap::new(),
BTreeMap::new(),
)),
};
let reverted_notification = ExExNotification::ChainReverted {
old: Arc::new(Chain::new(vec![blocks[1].clone()], Default::default(), None)),
old: Arc::new(Chain::new(
vec![blocks[1].clone()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
};
let committed_notification_2 = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![block_1_reorged.clone(), blocks[2].clone()],
Default::default(),
None,
BTreeMap::new(),
BTreeMap::new(),
)),
};
let reorged_notification = ExExNotification::ChainReorged {
old: Arc::new(Chain::new(vec![blocks[2].clone()], Default::default(), None)),
old: Arc::new(Chain::new(
vec![blocks[2].clone()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
new: Arc::new(Chain::new(
vec![block_2_reorged.clone(), blocks[3].clone()],
Default::default(),
None,
BTreeMap::new(),
BTreeMap::new(),
)),
};

View File

@@ -178,12 +178,22 @@ where
#[cfg(test)]
mod tests {
use super::Storage;
use alloy_consensus::BlockHeader;
use alloy_primitives::{
map::{HashMap, HashSet},
B256, U256,
};
use reth_exex_types::ExExNotification;
use reth_primitives_traits::Account;
use reth_provider::Chain;
use reth_testing_utils::generators::{self, random_block};
use std::{fs::File, sync::Arc};
use reth_trie_common::{
updates::{StorageTrieUpdates, TrieUpdates},
BranchNodeCompact, HashedPostState, HashedStorage, Nibbles,
};
use std::{collections::BTreeMap, fs::File, sync::Arc};
// wal with 1 block and tx
// wal with 1 block and tx (old 3-field format)
// <https://github.com/paradigmxyz/reth/issues/15012>
#[test]
fn decode_notification_wal() {
@@ -202,6 +212,24 @@ mod tests {
}
}
// wal with 1 block and tx (new 4-field format with trie updates and hashed state)
#[test]
fn decode_notification_wal_new_format() {
let wal = include_bytes!("../../test-data/new_format.wal");
let notification: reth_exex_types::serde_bincode_compat::ExExNotification<
'_,
reth_ethereum_primitives::EthPrimitives,
> = rmp_serde::decode::from_slice(wal.as_slice()).unwrap();
let notification: ExExNotification = notification.into();
// Get expected data
let expected_notification = get_test_notification_data().unwrap();
assert_eq!(
&notification, &expected_notification,
"Decoded notification should match expected static data"
);
}
#[test]
fn test_roundtrip() -> eyre::Result<()> {
let mut rng = generators::rng();
@@ -213,8 +241,18 @@ mod tests {
let new_block = random_block(&mut rng, 0, Default::default()).try_recover()?;
let notification = ExExNotification::ChainReorged {
new: Arc::new(Chain::new(vec![new_block], Default::default(), None)),
old: Arc::new(Chain::new(vec![old_block], Default::default(), None)),
new: Arc::new(Chain::new(
vec![new_block],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
old: Arc::new(Chain::new(
vec![old_block],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
};
// Do a round trip serialization and deserialization
@@ -229,6 +267,97 @@ mod tests {
Ok(())
}
/// Generate a new WAL file for testing.
///
/// Run this test with `--ignored` to generate a new test WAL file:
/// ```sh
/// cargo test -p reth-exex generate_test_wal -- --ignored --nocapture
/// ```
#[test]
#[ignore]
fn generate_test_wal() -> eyre::Result<()> {
use std::io::Write;
let notification = get_test_notification_data()?;
// Serialize the notification
let notification_compat =
reth_exex_types::serde_bincode_compat::ExExNotification::from(&notification);
let encoded = rmp_serde::encode::to_vec(&notification_compat)?;
// Write to test-data directory
let test_data_dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("test-data");
std::fs::create_dir_all(&test_data_dir)?;
let output_path = test_data_dir.join("new_format.wal");
let mut file = File::create(&output_path)?;
file.write_all(&encoded)?;
println!("Generated WAL file at: {}", output_path.display());
println!("File size: {} bytes", encoded.len());
println!("✓ WAL file created successfully!");
Ok(())
}
/// Helper function to generate deterministic test data for WAL tests
fn get_test_notification_data(
) -> eyre::Result<ExExNotification<reth_ethereum_primitives::EthPrimitives>> {
use reth_ethereum_primitives::Block;
use reth_primitives_traits::Block as _;
// Create a block with a transaction
let block = Block::default().seal_slow().try_recover()?;
let block_number = block.header().number();
let hashed_address = B256::from([1; 32]);
let storage_key = B256::from([2; 32]);
let trie_updates = TrieUpdates {
account_nodes: HashMap::from_iter([
(Nibbles::from_nibbles_unchecked([0x01]), BranchNodeCompact::default()),
(Nibbles::from_nibbles_unchecked([0x02]), BranchNodeCompact::default()),
]),
removed_nodes: HashSet::from_iter([Nibbles::from_nibbles_unchecked([0x03])]),
storage_tries: HashMap::from_iter([(
hashed_address,
StorageTrieUpdates {
is_deleted: false,
storage_nodes: HashMap::from_iter([(
Nibbles::from_nibbles_unchecked([0x04]),
BranchNodeCompact::default(),
)]),
removed_nodes: Default::default(),
},
)]),
};
let hashed_state = HashedPostState {
accounts: HashMap::from_iter([(
hashed_address,
Some(Account { nonce: 1, ..Default::default() }),
)]),
storages: HashMap::from_iter([(
hashed_address,
HashedStorage {
wiped: false,
storage: HashMap::from_iter([(storage_key, U256::from(101))]),
},
)]),
};
let notification: ExExNotification<reth_ethereum_primitives::EthPrimitives> =
ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![block],
Default::default(),
BTreeMap::from([(block_number, Arc::new(trie_updates.into_sorted()))]),
BTreeMap::from([(block_number, Arc::new(hashed_state.into_sorted()))]),
)),
};
Ok(notification)
}
#[test]
fn test_files_range() -> eyre::Result<()> {
let temp_dir = tempfile::tempdir()?;

Binary file not shown.

View File

@@ -201,7 +201,7 @@ pub(super) mod serde_bincode_compat {
use reth_primitives_traits::RecoveredBlock;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use std::sync::Arc;
use std::{collections::BTreeMap, sync::Arc};
#[test]
fn test_exex_notification_bincode_roundtrip() {
@@ -222,13 +222,15 @@ pub(super) mod serde_bincode_compat {
vec![RecoveredBlock::arbitrary(&mut arbitrary::Unstructured::new(&bytes))
.unwrap()],
Default::default(),
None,
BTreeMap::new(),
BTreeMap::new(),
)),
new: Arc::new(Chain::new(
vec![RecoveredBlock::arbitrary(&mut arbitrary::Unstructured::new(&bytes))
.unwrap()],
Default::default(),
None,
BTreeMap::new(),
BTreeMap::new(),
)),
},
};

View File

@@ -70,10 +70,11 @@ impl<T> Clone for UnboundedMeteredSender<T> {
}
}
/// A wrapper type around [Receiver](mpsc::UnboundedReceiver) that updates metrics on receive.
/// A wrapper type around [`UnboundedReceiver`](mpsc::UnboundedReceiver) that updates metrics on
/// receive.
#[derive(Debug)]
pub struct UnboundedMeteredReceiver<T> {
/// The [Receiver](mpsc::UnboundedReceiver) that this wraps around
/// The [`UnboundedReceiver`](mpsc::UnboundedReceiver) that this wraps around
receiver: mpsc::UnboundedReceiver<T>,
/// Holds metrics for this type
metrics: MeteredReceiverMetrics,

View File

@@ -459,7 +459,7 @@ pub struct Discv4Service {
ingress: IngressReceiver,
/// Sender for sending outgoing messages
///
/// Sends outgoind messages to the UDP task.
/// Sends outgoing messages to the UDP task.
egress: EgressSender,
/// Buffered pending pings to apply backpressure.
///
@@ -479,7 +479,7 @@ pub struct Discv4Service {
pending_find_nodes: HashMap<PeerId, FindNodeRequest>,
/// Currently active ENR requests
pending_enr_requests: HashMap<PeerId, EnrRequestState>,
/// Copy of he sender half of the commands channel for [Discv4]
/// Copy of the sender half of the commands channel for [Discv4]
to_service: mpsc::UnboundedSender<Discv4Command>,
/// Receiver half of the commands channel for [Discv4]
commands_rx: mpsc::UnboundedReceiver<Discv4Command>,

View File

@@ -5,7 +5,7 @@ use alloy_primitives::BlockNumber;
use futures::Stream;
use futures_util::StreamExt;
use reth_config::BodiesConfig;
use reth_consensus::{Consensus, ConsensusError};
use reth_consensus::Consensus;
use reth_network_p2p::{
bodies::{
client::BodiesClient,
@@ -41,7 +41,7 @@ pub struct BodiesDownloader<
/// The bodies client
client: Arc<C>,
/// The consensus client
consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
consensus: Arc<dyn Consensus<B>>,
/// The database handle
provider: Provider,
/// The maximum number of non-empty blocks per one request
@@ -307,12 +307,14 @@ where
{
type Block = B;
/// Set a new download range (exclusive).
/// Set a new download range (inclusive).
///
/// This method will drain all queued bodies, filter out ones outside the range and put them
/// back into the buffer.
/// If there are any bodies between the range start and last queued body that have not been
/// downloaded or are not in progress, they will be re-requested.
/// If the provided range is a suffix of the current range with the same end block, the
/// existing download already covers it and the call is a no-op.
/// If the range starts immediately after the current range, it is treated as the next
/// consecutive range and appended without resetting the in-flight state.
/// For all other ranges, the downloader state is cleared and the new range replaces the old
/// one.
fn set_download_range(&mut self, range: RangeInclusive<BlockNumber>) -> DownloadResult<()> {
// Check if the range is valid.
if range.is_empty() {
@@ -577,7 +579,7 @@ impl BodiesDownloaderBuilder {
pub fn build<B, C, Provider>(
self,
client: C,
consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
consensus: Arc<dyn Consensus<B>>,
provider: Provider,
) -> BodiesDownloader<B, C, Provider>
where

View File

@@ -4,7 +4,7 @@ use alloy_consensus::BlockHeader;
use alloy_primitives::BlockNumber;
use futures::{stream::FuturesUnordered, Stream};
use futures_util::StreamExt;
use reth_consensus::{Consensus, ConsensusError};
use reth_consensus::Consensus;
use reth_network_p2p::{
bodies::{client::BodiesClient, response::BlockResponse},
error::DownloadResult,
@@ -58,7 +58,7 @@ where
pub(crate) fn push_new_request(
&mut self,
client: Arc<C>,
consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
consensus: Arc<dyn Consensus<B>>,
request: Vec<SealedHeader<B::Header>>,
) {
// Set last max requested block number

View File

@@ -2,7 +2,7 @@ use crate::metrics::{BodyDownloaderMetrics, ResponseMetrics};
use alloy_consensus::BlockHeader;
use alloy_primitives::B256;
use futures::{Future, FutureExt};
use reth_consensus::{Consensus, ConsensusError};
use reth_consensus::Consensus;
use reth_network_p2p::{
bodies::{client::BodiesClient, response::BlockResponse},
error::{DownloadError, DownloadResult},
@@ -38,7 +38,7 @@ use std::{
/// and eventually disconnected.
pub(crate) struct BodiesRequestFuture<B: Block, C: BodiesClient<Body = B::Body>> {
client: Arc<C>,
consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
consensus: Arc<dyn Consensus<B>>,
metrics: BodyDownloaderMetrics,
/// Metrics for individual responses. This can be used to observe how the size (in bytes) of
/// responses change while bodies are being downloaded.
@@ -60,7 +60,7 @@ where
/// Returns an empty future. Use [`BodiesRequestFuture::with_headers`] to set the request.
pub(crate) fn new(
client: Arc<C>,
consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
consensus: Arc<dyn Consensus<B>>,
metrics: BodyDownloaderMetrics,
) -> Self {
Self {

View File

@@ -42,7 +42,7 @@ impl<B: Block + 'static> TaskDownloader<B> {
/// # Example
///
/// ```
/// use reth_consensus::{Consensus, ConsensusError};
/// use reth_consensus::Consensus;
/// use reth_downloaders::bodies::{bodies::BodiesDownloaderBuilder, task::TaskDownloader};
/// use reth_network_p2p::bodies::client::BodiesClient;
/// use reth_primitives_traits::{Block, InMemorySize};
@@ -55,7 +55,7 @@ impl<B: Block + 'static> TaskDownloader<B> {
/// Provider: HeaderProvider<Header = B::Header> + Unpin + 'static,
/// >(
/// client: Arc<C>,
/// consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
/// consensus: Arc<dyn Consensus<B>>,
/// provider: Provider,
/// ) {
/// let downloader =

View File

@@ -86,7 +86,7 @@ impl<B: FullBlock> FileClient<B> {
/// Create a new file client from a file path.
pub async fn new<P: AsRef<Path>>(
path: P,
consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
consensus: Arc<dyn Consensus<B>>,
) -> Result<Self, FileClientError> {
let file = File::open(path).await?;
Self::from_file(file, consensus).await
@@ -95,7 +95,7 @@ impl<B: FullBlock> FileClient<B> {
/// Initialize the [`FileClient`] with a file directly.
pub(crate) async fn from_file(
mut file: File,
consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
consensus: Arc<dyn Consensus<B>>,
) -> Result<Self, FileClientError> {
// get file len from metadata before reading
let metadata = file.metadata().await?;
@@ -200,7 +200,7 @@ impl<B: FullBlock> FileClient<B> {
}
struct FileClientBuilder<B: Block> {
pub consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
pub consensus: Arc<dyn Consensus<B>>,
pub parent_header: Option<SealedHeader<B::Header>>,
}
@@ -562,7 +562,7 @@ impl ChunkedFileReader {
/// are available before processing. For plain files, it uses the original chunking logic.
pub async fn next_chunk<B: FullBlock>(
&mut self,
consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
consensus: Arc<dyn Consensus<B>>,
parent_header: Option<SealedHeader<B::Header>>,
) -> Result<Option<FileClient<B>>, FileClientError> {
let Some(chunk_len) = self.read_next_chunk().await? else { return Ok(None) };
@@ -726,7 +726,7 @@ mod tests {
downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
let headers = downloader.next().await.unwrap();
assert_eq!(headers, Ok(vec![p0, p1, p2]));
assert_eq!(headers.unwrap(), vec![p0, p1, p2]);
assert!(downloader.next().await.is_none());
assert!(downloader.next().await.is_none());
}

View File

@@ -1464,7 +1464,7 @@ mod tests {
.await;
let headers = downloader.next().await.unwrap();
assert_eq!(headers, Ok(vec![p0, p1, p2,]));
assert_eq!(headers.unwrap(), vec![p0, p1, p2,]);
assert!(downloader.buffered_responses.is_empty());
assert!(downloader.next().await.is_none());
assert!(downloader.next().await.is_none());
@@ -1496,18 +1496,18 @@ mod tests {
.await;
let headers = downloader.next().await.unwrap();
assert_eq!(headers, Ok(vec![p0]));
let headers = headers.unwrap();
assert_eq!(headers, vec![p0]);
assert_eq!(headers.capacity(), headers.len());
let headers = downloader.next().await.unwrap();
assert_eq!(headers, Ok(vec![p1]));
let headers = headers.unwrap();
assert_eq!(headers, vec![p1]);
assert_eq!(headers.capacity(), headers.len());
let headers = downloader.next().await.unwrap();
assert_eq!(headers, Ok(vec![p2]));
let headers = headers.unwrap();
assert_eq!(headers, vec![p2]);
assert_eq!(headers.capacity(), headers.len());
assert!(downloader.next().await.is_none());
@@ -1539,18 +1539,18 @@ mod tests {
.await;
let headers = downloader.next().await.unwrap();
assert_eq!(headers, Ok(vec![p0]));
let headers = headers.unwrap();
assert_eq!(headers, vec![p0]);
assert_eq!(headers.capacity(), headers.len());
let headers = downloader.next().await.unwrap();
assert_eq!(headers, Ok(vec![p1]));
let headers = headers.unwrap();
assert_eq!(headers, vec![p1]);
assert_eq!(headers.capacity(), headers.len());
let headers = downloader.next().await.unwrap();
assert_eq!(headers, Ok(vec![p2]));
let headers = headers.unwrap();
assert_eq!(headers, vec![p2]);
assert_eq!(headers.capacity(), headers.len());
assert!(downloader.next().await.is_none());

View File

@@ -223,11 +223,11 @@ mod tests {
.await;
let headers = downloader.next().await.unwrap();
assert_eq!(headers, Ok(vec![p0]));
assert_eq!(headers.unwrap(), vec![p0]);
let headers = downloader.next().await.unwrap();
assert_eq!(headers, Ok(vec![p1]));
assert_eq!(headers.unwrap(), vec![p1]);
let headers = downloader.next().await.unwrap();
assert_eq!(headers, Ok(vec![p2]));
assert_eq!(headers.unwrap(), vec![p2]);
}
}

View File

@@ -34,7 +34,6 @@ secp256k1 = { workspace = true, features = ["global-context", "std", "recovery",
rand_08.workspace = true
concat-kdf.workspace = true
sha2.workspace = true
sha3.workspace = true
aes.workspace = true
hmac.workspace = true
block-padding.workspace = true

View File

@@ -9,12 +9,12 @@ use crate::{
use aes::{cipher::StreamCipher, Aes128, Aes256};
use alloy_primitives::{
bytes::{BufMut, Bytes, BytesMut},
B128, B256, B512 as PeerId,
Keccak256, B128, B256, B512 as PeerId,
};
use alloy_rlp::{Encodable, Rlp, RlpEncodable, RlpMaxEncodedLen};
use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use ctr::Ctr64BE;
use digest::{crypto_common::KeyIvInit, Digest};
use digest::crypto_common::KeyIvInit;
use rand_08::{thread_rng as rng, Rng};
use reth_network_peers::{id2pk, pk2id};
use secp256k1::{
@@ -22,7 +22,6 @@ use secp256k1::{
PublicKey, SecretKey, SECP256K1,
};
use sha2::Sha256;
use sha3::Keccak256;
const PROTOCOL_VERSION: usize = 4;

View File

@@ -10,11 +10,10 @@
//! For more information, refer to the [Ethereum MAC specification](https://github.com/ethereum/devp2p/blob/master/rlpx.md#mac).
use aes::Aes256Enc;
use alloy_primitives::{B128, B256};
use alloy_primitives::{Keccak256, B128, B256};
use block_padding::NoPadding;
use cipher::BlockEncrypt;
use digest::KeyInit;
use sha3::{Digest, Keccak256};
/// [`Ethereum MAC`](https://github.com/ethereum/devp2p/blob/master/rlpx.md#mac) state.
///
@@ -57,7 +56,7 @@ impl MAC {
self.hasher.update(data);
let prev = self.digest();
let aes = Aes256Enc::new_from_slice(self.secret.as_ref()).unwrap();
let mut encrypted = self.digest().0;
let mut encrypted = prev.0;
aes.encrypt_padded::<NoPadding>(&mut encrypted, B128::len_bytes()).unwrap();
for i in 0..16 {

View File

@@ -757,12 +757,12 @@ impl RequestTxHashes {
Self::new(HashSet::with_capacity_and_hasher(capacity, Default::default()))
}
/// Returns an new empty instance.
/// Returns a new empty instance.
fn empty() -> Self {
Self::new(HashSet::default())
}
/// Retains the given number of elements, returning and iterator over the rest.
/// Retains the given number of elements, returning an iterator over the rest.
pub fn retain_count(&mut self, count: usize) -> Self {
let rest_capacity = self.hashes.len().saturating_sub(count);
if rest_capacity == 0 {

View File

@@ -3,7 +3,9 @@
use crate::{
eth_requests::EthRequestHandler,
transactions::{
config::{StrictEthAnnouncementFilter, TransactionPropagationKind},
config::{
AnnouncementFilteringPolicy, StrictEthAnnouncementFilter, TransactionPropagationKind,
},
policy::NetworkPolicies,
TransactionPropagationPolicy, TransactionsManager, TransactionsManagerConfig,
},
@@ -84,17 +86,41 @@ impl<Tx, Eth, N: NetworkPrimitives> NetworkBuilder<Tx, Eth, N> {
}
/// Creates a new [`TransactionsManager`] and wires it to the network.
pub fn transactions_with_policy<Pool: TransactionPool>(
///
/// Uses the default [`StrictEthAnnouncementFilter`] for announcement filtering.
pub fn transactions_with_policy<Pool: TransactionPool, P: TransactionPropagationPolicy<N>>(
self,
pool: Pool,
transactions_manager_config: TransactionsManagerConfig,
propagation_policy: impl TransactionPropagationPolicy<N>,
propagation_policy: P,
) -> NetworkBuilder<TransactionsManager<Pool, N>, Eth, N> {
self.transactions_with_policies(
pool,
transactions_manager_config,
propagation_policy,
StrictEthAnnouncementFilter::default(),
)
}
/// Creates a new [`TransactionsManager`] with custom propagation and announcement policies.
///
/// This allows chains with custom transaction types (like CATX) to configure
/// the announcement filter to accept their transaction types.
pub fn transactions_with_policies<
Pool: TransactionPool,
P: TransactionPropagationPolicy<N>,
A: AnnouncementFilteringPolicy<N>,
>(
self,
pool: Pool,
transactions_manager_config: TransactionsManagerConfig,
propagation_policy: P,
announcement_policy: A,
) -> NetworkBuilder<TransactionsManager<Pool, N>, Eth, N> {
let Self { mut network, request_handler, .. } = self;
let (tx, rx) = mpsc::unbounded_channel();
network.set_transactions(tx);
let handle = network.handle().clone();
let announcement_policy = StrictEthAnnouncementFilter::default();
let policies = NetworkPolicies::new(propagation_policy, announcement_policy);
let transactions = TransactionsManager::with_policy(

View File

@@ -1245,7 +1245,7 @@ pub enum PeerAction {
PeerRemoved(PeerId),
}
/// Error thrown when a incoming connection is rejected right away
/// Error thrown when an incoming connection is rejected right away
#[derive(Debug, Error, PartialEq, Eq)]
pub enum InboundConnectionError {
/// The remote's ip address is banned

View File

@@ -637,7 +637,7 @@ impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
//
// known txns have already been successfully fetched or received over gossip.
//
// most hashes will be filtered out here since this the mempool protocol is a gossip
// most hashes will be filtered out here since the mempool protocol is a gossip
// protocol, healthy peers will send many of the same hashes.
//
let hashes_count_pre_pool_filter = partially_valid_msg.len();
@@ -2000,7 +2000,7 @@ impl<N: NetworkPrimitives> PeerMetadata<N> {
&self.request_tx
}
/// Return a
/// Returns a mutable reference to the seen transactions LRU cache.
pub const fn seen_transactions_mut(&mut self) -> &mut LruCache<TxHash> {
&mut self.seen_transactions
}

Some files were not shown because too many files have changed in this diff Show More