mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
99 Commits
feat/use-h
...
push
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d6324d63e2 | ||
|
|
5f3ade1bfe | ||
|
|
b053f6fafe | ||
|
|
2a58e7a077 | ||
|
|
793a3d5fb3 | ||
|
|
89ae1af694 | ||
|
|
9c33fb5d45 | ||
|
|
bef3d7b4d1 | ||
|
|
e918c17af9 | ||
|
|
fcc170d53c | ||
|
|
c685842ba2 | ||
|
|
564ffa5868 | ||
|
|
12891dd171 | ||
|
|
c1015022f5 | ||
|
|
e3fe6326bc | ||
|
|
e3d520b24f | ||
|
|
9f29939ea1 | ||
|
|
10881d1c73 | ||
|
|
408593467b | ||
|
|
8caf8cdf11 | ||
|
|
1e8030ef28 | ||
|
|
f72c503d6f | ||
|
|
42890e6e7f | ||
|
|
e30e441ada | ||
|
|
121160d248 | ||
|
|
7ff78ca082 | ||
|
|
d7f56d509c | ||
|
|
3300e404cf | ||
|
|
77cb99fc78 | ||
|
|
66169c7e7c | ||
|
|
4f5fafc8f3 | ||
|
|
0b8e6c6ed3 | ||
|
|
4a62d38af2 | ||
|
|
dc4f249f09 | ||
|
|
c915841a45 | ||
|
|
217a337d8c | ||
|
|
74d57008b6 | ||
|
|
f8767bc678 | ||
|
|
81c83bba68 | ||
|
|
cd8ec58703 | ||
|
|
931b17c3fd | ||
|
|
807d328cf0 | ||
|
|
8a6bbd29fe | ||
|
|
8bedaaee71 | ||
|
|
09cd105671 | ||
|
|
a0b60b7e64 | ||
|
|
90e15d096d | ||
|
|
a161ca294f | ||
|
|
3a5c41e3da | ||
|
|
968d3c9534 | ||
|
|
fc6666f6a7 | ||
|
|
ff3a854326 | ||
|
|
04543ed16b | ||
|
|
ae3f0d4d1a | ||
|
|
5bccdc4a5d | ||
|
|
0b7cd60668 | ||
|
|
aa983b49af | ||
|
|
2aff617767 | ||
|
|
2c5d00ffb5 | ||
|
|
e2a3527414 | ||
|
|
e4cb3d3aed | ||
|
|
079b7b9d57 | ||
|
|
8a25d7d3cf | ||
|
|
a5ced84098 | ||
|
|
59760a2fe3 | ||
|
|
b9d21f293e | ||
|
|
dec1cad318 | ||
|
|
165b94c3fa | ||
|
|
69e4c06ae7 | ||
|
|
1406a984a7 | ||
|
|
93d6b9782c | ||
|
|
68e4ff1f7d | ||
|
|
33467ea6dd | ||
|
|
3bf9280b3c | ||
|
|
5c93986e6d | ||
|
|
779e0eb8bb | ||
|
|
5c4163c177 | ||
|
|
c5d1f70dd3 | ||
|
|
a8ec78fc87 | ||
|
|
1ecbb0b9d6 | ||
|
|
a40647e651 | ||
|
|
b25b8c00ee | ||
|
|
b20a99e1c9 | ||
|
|
9ec0e3cd51 | ||
|
|
09837bbdb4 | ||
|
|
198e457a12 | ||
|
|
c727c61101 | ||
|
|
366857559b | ||
|
|
ccd15e8a25 | ||
|
|
67f89fa4b2 | ||
|
|
a87510069d | ||
|
|
b3fe168548 | ||
|
|
8d7583b6fb | ||
|
|
32466fe223 | ||
|
|
f2061991c5 | ||
|
|
a549b4d66d | ||
|
|
cdcea2bd33 | ||
|
|
3898cc5c3d | ||
|
|
c558c1d10f |
5
.changelog/bold-frogs-run.md
Normal file
5
.changelog/bold-frogs-run.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-transaction-pool: patch
|
||||
---
|
||||
|
||||
Renamed and documented validation methods for clarity. `validate_one_no_state` and `validate_one_against_state` are now public methods `validate_stateless` and `validate_stateful` with improved documentation explaining their respective validation phases.
|
||||
5
.changelog/dry-ducks-write.md
Normal file
5
.changelog/dry-ducks-write.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-network: minor
|
||||
---
|
||||
|
||||
Added reason label to backed_off_peers metric. The metric now tracks backed off peers by reason (too_many_peers, graceful_close, connection_error) to improve observability.
|
||||
5
.changelog/easy-clouds-meow.md
Normal file
5
.changelog/easy-clouds-meow.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
ef-tests: patch
|
||||
---
|
||||
|
||||
Removed reth-stateless crate and stateless validation from ef-tests.
|
||||
6
.changelog/fair-winds-growl.md
Normal file
6
.changelog/fair-winds-growl.md
Normal file
@@ -0,0 +1,6 @@
|
||||
---
|
||||
reth-exex: patch
|
||||
reth-exex-types: patch
|
||||
---
|
||||
|
||||
Added configurable backfill thresholds to ExEx notifications stream and added regression tests for state provider parity between pipeline and backfill execution paths.
|
||||
4
.changelog/fast-fish-cry.md
Normal file
4
.changelog/fast-fish-cry.md
Normal file
@@ -0,0 +1,4 @@
|
||||
---
|
||||
---
|
||||
|
||||
Added WebSocket subscription integration tests for eth_subscribe.
|
||||
4
.changelog/fast-waves-smile.md
Normal file
4
.changelog/fast-waves-smile.md
Normal file
@@ -0,0 +1,4 @@
|
||||
---
|
||||
---
|
||||
|
||||
Improved nightly Docker build failure Slack notification with more detailed formatting and context.
|
||||
7
.changelog/icy-lions-slide.md
Normal file
7
.changelog/icy-lions-slide.md
Normal file
@@ -0,0 +1,7 @@
|
||||
---
|
||||
reth: patch
|
||||
reth-cli-commands: patch
|
||||
reth-node-core: patch
|
||||
---
|
||||
|
||||
Removed experimental ress protocol support for stateless Ethereum nodes.
|
||||
5
.changelog/lazy-lakes-shout.md
Normal file
5
.changelog/lazy-lakes-shout.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-node-builder: patch
|
||||
---
|
||||
|
||||
Removed biased select in engine service loop to allow fair scheduling of shutdown requests alongside event processing.
|
||||
5
.changelog/lively-clams-drink.md
Normal file
5
.changelog/lively-clams-drink.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-transaction-pool: patch
|
||||
---
|
||||
|
||||
Fixed swapped arguments in `blob_tx_priority` function calls, correcting the parameter order to match the function signature.
|
||||
4
.changelog/lively-clouds-bake.md
Normal file
4
.changelog/lively-clouds-bake.md
Normal file
@@ -0,0 +1,4 @@
|
||||
---
|
||||
---
|
||||
|
||||
Improved documentation overview page with better structure and clarity.
|
||||
5
.changelog/lively-foxes-play.md
Normal file
5
.changelog/lively-foxes-play.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-node-events: patch
|
||||
---
|
||||
|
||||
Updated consensus engine log message to be more accurate about received updates.
|
||||
9
.changelog/nice-trees-drink.md
Normal file
9
.changelog/nice-trees-drink.md
Normal file
@@ -0,0 +1,9 @@
|
||||
---
|
||||
reth-network-api: minor
|
||||
reth-network-types: minor
|
||||
reth-network: minor
|
||||
reth-node-core: minor
|
||||
reth: minor
|
||||
---
|
||||
|
||||
Added optional ENR fork ID enforcement to filter out peers from incompatible networks during peer discovery, controlled by the `--enforce-enr-fork-id` CLI flag.
|
||||
5
.changelog/nice-waves-bow.md
Normal file
5
.changelog/nice-waves-bow.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-primitives: patch
|
||||
---
|
||||
|
||||
Moved feature-referenced dependencies from dev-dependencies to optional dependencies to ensure they are available when their corresponding features are enabled.
|
||||
5
.changelog/old-dogs-shake.md
Normal file
5
.changelog/old-dogs-shake.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-transaction-pool: minor
|
||||
---
|
||||
|
||||
Added `IntoIter: Send` bounds to `validate_transactions` and `validate_transactions_with_origin` in the `TransactionValidator` trait, avoiding unnecessary `Vec` collects. Simplified default `validate_transactions_with_origin` to delegate to `validate_transactions`.
|
||||
5
.changelog/quiet-frogs-whisper.md
Normal file
5
.changelog/quiet-frogs-whisper.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-provider: patch
|
||||
---
|
||||
|
||||
Removed unused staging types from ProviderFactoryBuilder.
|
||||
5
.changelog/swift-owls-fly.md
Normal file
5
.changelog/swift-owls-fly.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-trie-sparse: minor
|
||||
---
|
||||
|
||||
Removed `SerialSparseTrie` from the workspace, consolidating on `ParallelSparseTrie` as the single sparse trie implementation in `reth-trie-sparse`.
|
||||
5
.changelog/tidy-stars-cry.md
Normal file
5
.changelog/tidy-stars-cry.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-trie-sparse: patch
|
||||
---
|
||||
|
||||
Fixed a bug where trie nodes could appear in both `updated_nodes` and `removed_nodes` simultaneously by removing entries from `removed_nodes` when a node is inserted as updated.
|
||||
4
.changelog/vast-waves-fold.md
Normal file
4
.changelog/vast-waves-fold.md
Normal file
@@ -0,0 +1,4 @@
|
||||
---
|
||||
---
|
||||
|
||||
Expanded CLI integration tests with subcommand help coverage, config TOML validation, genesis JSON validation, and send transaction round-trip test for dev mode.
|
||||
5
.changelog/warm-foxes-glow.md
Normal file
5
.changelog/warm-foxes-glow.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-network: minor
|
||||
---
|
||||
|
||||
Added direction labels to `closed_sessions` and `pending_session_failures` metrics. Operators can now distinguish session closures and failures by direction (`active`, `incoming_pending`, `outgoing_pending` for closed sessions; `inbound`, `outbound` for pending session failures).
|
||||
4
.changelog/warm-foxes-jump.md
Normal file
4
.changelog/warm-foxes-jump.md
Normal file
@@ -0,0 +1,4 @@
|
||||
---
|
||||
---
|
||||
|
||||
Moved Kurtosis CI failure notifications to the hive Slack channel.
|
||||
7
.changelog/warm-geese-build.md
Normal file
7
.changelog/warm-geese-build.md
Normal file
@@ -0,0 +1,7 @@
|
||||
---
|
||||
reth-rpc-api: minor
|
||||
reth-rpc-builder: patch
|
||||
reth-rpc: minor
|
||||
---
|
||||
|
||||
Added `subscribeFinalizedChainNotifications` RPC endpoint that buffers committed chain notifications and emits them once a new finalized block is received.
|
||||
5
.changelog/zesty-clouds-wave.md
Normal file
5
.changelog/zesty-clouds-wave.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-trie: patch
|
||||
---
|
||||
|
||||
Fixed a potential panic in `ProofCalculator` by clearing internal computation state (`branch_stack`, `child_stack`, `branch_path`, etc.) after errors, preventing stale state from causing `usize` underflow panics when the calculator is reused. Added a test verifying correct behavior after simulated mid-computation errors.
|
||||
2
.github/CODEOWNERS
vendored
2
.github/CODEOWNERS
vendored
@@ -38,7 +38,7 @@ crates/storage/libmdbx-rs/ @shekhirin
|
||||
crates/storage/nippy-jar/ @joshieDo @shekhirin
|
||||
crates/storage/provider/ @joshieDo @shekhirin @yongkangc
|
||||
crates/storage/storage-api/ @joshieDo
|
||||
crates/tasks/ @mattsse
|
||||
crates/tasks/ @mattsse @DaniPopes
|
||||
crates/tokio-util/ @mattsse
|
||||
crates/tracing/ @mattsse @shekhirin
|
||||
crates/tracing-otlp/ @mattsse @Rjected
|
||||
|
||||
1
.github/scripts/check_rv32imac.sh
vendored
1
.github/scripts/check_rv32imac.sh
vendored
@@ -27,7 +27,6 @@ crates_to_check=(
|
||||
reth-ethereum-forks
|
||||
reth-ethereum-primitives
|
||||
reth-ethereum-consensus
|
||||
reth-stateless
|
||||
)
|
||||
|
||||
any_failed=0
|
||||
|
||||
1
.github/scripts/check_wasm.sh
vendored
1
.github/scripts/check_wasm.sh
vendored
@@ -63,6 +63,7 @@ exclude_crates=(
|
||||
reth-provider # tokio
|
||||
reth-prune # tokio
|
||||
reth-prune-static-files # reth-provider
|
||||
reth-tasks # tokio rt-multi-thread
|
||||
reth-stages-api # reth-provider, reth-prune
|
||||
reth-static-file # tokio
|
||||
reth-transaction-pool # c-kzg
|
||||
|
||||
1
.github/workflows/dependencies.yml
vendored
1
.github/workflows/dependencies.yml
vendored
@@ -15,6 +15,7 @@ permissions:
|
||||
|
||||
jobs:
|
||||
update:
|
||||
if: github.repository == 'paradigmxyz/reth'
|
||||
uses: tempoxyz/ci/.github/workflows/cargo-update-pr.yml@main
|
||||
secrets:
|
||||
token: ${{ secrets.GITHUB_TOKEN }}
|
||||
28
.github/workflows/docker.yml
vendored
28
.github/workflows/docker.yml
vendored
@@ -31,6 +31,7 @@ on:
|
||||
|
||||
jobs:
|
||||
build:
|
||||
if: github.repository == 'paradigmxyz/reth'
|
||||
name: Build Docker images
|
||||
runs-on: ubuntu-24.04
|
||||
permissions:
|
||||
@@ -69,18 +70,27 @@ jobs:
|
||||
# Add 'latest' tag for non-RC releases
|
||||
if [[ ! "$VERSION" =~ -rc ]]; then
|
||||
echo "ethereum_tags=${REGISTRY}/reth:${VERSION},${REGISTRY}/reth:latest" >> "$GITHUB_OUTPUT"
|
||||
{
|
||||
echo "ethereum_set<<EOF"
|
||||
echo "ethereum.tags=${REGISTRY}/reth:${VERSION}"
|
||||
echo "ethereum.tags=${REGISTRY}/reth:latest"
|
||||
echo "EOF"
|
||||
} >> "$GITHUB_OUTPUT"
|
||||
else
|
||||
echo "ethereum_tags=${REGISTRY}/reth:${VERSION}" >> "$GITHUB_OUTPUT"
|
||||
echo "ethereum_set=ethereum.tags=${REGISTRY}/reth:${VERSION}" >> "$GITHUB_OUTPUT"
|
||||
fi
|
||||
|
||||
elif [[ "${{ github.event_name }}" == "schedule" ]] || [[ "${{ inputs.build_type }}" == "nightly" ]]; then
|
||||
echo "targets=nightly" >> "$GITHUB_OUTPUT"
|
||||
echo "ethereum_tags=${REGISTRY}/reth:nightly" >> "$GITHUB_OUTPUT"
|
||||
echo "ethereum_set=ethereum.tags=${REGISTRY}/reth:nightly" >> "$GITHUB_OUTPUT"
|
||||
|
||||
else
|
||||
# git-sha build
|
||||
echo "targets=ethereum" >> "$GITHUB_OUTPUT"
|
||||
echo "ethereum_tags=${REGISTRY}/reth:${{ github.sha }}" >> "$GITHUB_OUTPUT"
|
||||
echo "ethereum_set=ethereum.tags=${REGISTRY}/reth:${{ github.sha }}" >> "$GITHUB_OUTPUT"
|
||||
fi
|
||||
|
||||
- name: Build and push images
|
||||
@@ -96,7 +106,7 @@ jobs:
|
||||
targets: ${{ steps.params.outputs.targets }}
|
||||
push: ${{ !(github.event_name == 'workflow_dispatch' && inputs.dry_run) }}
|
||||
set: |
|
||||
ethereum.tags=${{ steps.params.outputs.ethereum_tags }}
|
||||
${{ steps.params.outputs.ethereum_set }}
|
||||
|
||||
- name: Verify image architectures
|
||||
env:
|
||||
@@ -116,6 +126,18 @@ jobs:
|
||||
- name: Slack Webhook Action
|
||||
uses: rtCamp/action-slack-notify@v2
|
||||
env:
|
||||
SLACK_COLOR: ${{ job.status }}
|
||||
SLACK_MESSAGE: "Failed run: https://github.com/paradigmxyz/reth/actions/runs/${{ github.run_id }}"
|
||||
SLACK_COLOR: danger
|
||||
SLACK_ICON_EMOJI: ":rotating_light:"
|
||||
SLACK_USERNAME: "GitHub Actions"
|
||||
SLACK_TITLE: ":rotating_light: Nightly Docker Build Failed"
|
||||
SLACK_MESSAGE: |
|
||||
The scheduled nightly Docker build failed.
|
||||
|
||||
*Commit:* `${{ github.sha }}`
|
||||
*Branch:* `${{ github.ref_name }}`
|
||||
*Run:* <https://github.com/paradigmxyz/reth/actions/runs/${{ github.run_id }}|View logs>
|
||||
|
||||
*Action required:* Re-run the workflow or investigate the build failure.
|
||||
SLACK_FOOTER: "paradigmxyz/reth · docker.yml"
|
||||
MSG_MINIMAL: true
|
||||
SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
|
||||
2
.github/workflows/e2e.yml
vendored
2
.github/workflows/e2e.yml
vendored
@@ -35,6 +35,7 @@ jobs:
|
||||
- name: Run e2e tests
|
||||
run: |
|
||||
cargo nextest run \
|
||||
--no-fail-fast \
|
||||
--locked --features "asm-keccak" \
|
||||
--workspace \
|
||||
--exclude 'example-*' \
|
||||
@@ -61,6 +62,7 @@ jobs:
|
||||
- name: Run RocksDB e2e tests
|
||||
run: |
|
||||
cargo nextest run \
|
||||
--no-fail-fast \
|
||||
--locked --features "edge" \
|
||||
-p reth-e2e-test-utils \
|
||||
-E 'binary(rocksdb)'
|
||||
|
||||
5
.github/workflows/integration.yml
vendored
5
.github/workflows/integration.yml
vendored
@@ -46,6 +46,7 @@ jobs:
|
||||
- name: Run tests
|
||||
run: |
|
||||
cargo nextest run \
|
||||
--no-fail-fast \
|
||||
--locked --features "asm-keccak ${{ matrix.network }} ${{ matrix.storage == 'edge' && 'edge' || '' }}" \
|
||||
--workspace --exclude ef-tests \
|
||||
-E "kind(test) and not binary(e2e_testsuite)"
|
||||
@@ -64,7 +65,7 @@ jobs:
|
||||
|
||||
era-files:
|
||||
name: era1 file integration tests once a day
|
||||
if: github.event_name == 'schedule'
|
||||
if: github.event_name == 'schedule' && github.repository == 'paradigmxyz/reth'
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
@@ -76,4 +77,4 @@ jobs:
|
||||
with:
|
||||
cache-on-failure: true
|
||||
- name: run era1 files integration tests
|
||||
run: cargo nextest run --release --package reth-era --test it -- --ignored
|
||||
run: cargo nextest run --no-fail-fast --release --package reth-era --test it -- --ignored
|
||||
|
||||
3
.github/workflows/kurtosis.yml
vendored
3
.github/workflows/kurtosis.yml
vendored
@@ -20,6 +20,7 @@ concurrency:
|
||||
|
||||
jobs:
|
||||
build-reth:
|
||||
if: github.repository == 'paradigmxyz/reth'
|
||||
uses: ./.github/workflows/docker-test.yml
|
||||
with:
|
||||
hive_target: kurtosis
|
||||
@@ -65,4 +66,4 @@ jobs:
|
||||
env:
|
||||
SLACK_COLOR: ${{ job.status }}
|
||||
SLACK_MESSAGE: "Failed run: https://github.com/paradigmxyz/reth/actions/runs/${{ github.run_id }}"
|
||||
SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
SLACK_WEBHOOK: ${{ secrets.SLACK_HIVE_WEBHOOK_URL }}
|
||||
|
||||
1
.github/workflows/reproducible-build.yml
vendored
1
.github/workflows/reproducible-build.yml
vendored
@@ -7,6 +7,7 @@ on:
|
||||
|
||||
jobs:
|
||||
build:
|
||||
if: github.repository == 'paradigmxyz/reth'
|
||||
name: build reproducible binaries
|
||||
runs-on: ${{ matrix.runner }}
|
||||
strategy:
|
||||
|
||||
2
.github/workflows/stage.yml
vendored
2
.github/workflows/stage.yml
vendored
@@ -38,7 +38,7 @@ jobs:
|
||||
cache-on-failure: true
|
||||
- name: Build reth
|
||||
run: |
|
||||
cargo install --features asm-keccak,jemalloc --path bin/reth
|
||||
cargo install --path bin/reth
|
||||
- name: Run headers stage
|
||||
run: |
|
||||
reth stage run headers --from ${{ env.FROM_BLOCK }} --to ${{ env.TO_BLOCK }} --commit --checkpoints
|
||||
|
||||
1
.github/workflows/stale.yml
vendored
1
.github/workflows/stale.yml
vendored
@@ -9,6 +9,7 @@ on:
|
||||
|
||||
jobs:
|
||||
close-issues:
|
||||
if: github.repository == 'paradigmxyz/reth'
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
issues: write
|
||||
|
||||
1
.github/workflows/sync-era.yml
vendored
1
.github/workflows/sync-era.yml
vendored
@@ -17,6 +17,7 @@ concurrency:
|
||||
|
||||
jobs:
|
||||
sync:
|
||||
if: github.repository == 'paradigmxyz/reth'
|
||||
name: sync (${{ matrix.chain.bin }})
|
||||
runs-on: depot-ubuntu-latest
|
||||
env:
|
||||
|
||||
1
.github/workflows/sync.yml
vendored
1
.github/workflows/sync.yml
vendored
@@ -17,6 +17,7 @@ concurrency:
|
||||
|
||||
jobs:
|
||||
sync:
|
||||
if: github.repository == 'paradigmxyz/reth'
|
||||
name: sync (${{ matrix.chain.bin }})
|
||||
runs-on: depot-ubuntu-latest
|
||||
env:
|
||||
|
||||
3
.github/workflows/unit.yml
vendored
3
.github/workflows/unit.yml
vendored
@@ -49,6 +49,7 @@ jobs:
|
||||
- name: Run tests
|
||||
run: |
|
||||
cargo nextest run \
|
||||
--no-fail-fast \
|
||||
--features "${{ matrix.features }} $EDGE_FEATURES" --locked \
|
||||
${{ matrix.exclude_args }} --workspace \
|
||||
--exclude ef-tests --no-tests=warn \
|
||||
@@ -87,7 +88,7 @@ jobs:
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
with:
|
||||
cache-on-failure: true
|
||||
- run: cargo nextest run --cargo-profile hivetests -p ef-tests --features "asm-keccak ef-tests"
|
||||
- run: cargo nextest run --no-fail-fast --cargo-profile hivetests -p ef-tests --features "asm-keccak ef-tests"
|
||||
|
||||
doc:
|
||||
name: doc tests
|
||||
|
||||
@@ -381,7 +381,7 @@ cargo nextest run --workspace
|
||||
cargo bench --bench bench_name
|
||||
|
||||
# Build optimized binary
|
||||
cargo build --release --features "jemalloc asm-keccak"
|
||||
cargo build --release
|
||||
|
||||
# Check compilation for all features
|
||||
cargo check --workspace --all-features
|
||||
|
||||
829
Cargo.lock
generated
829
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
116
Cargo.toml
116
Cargo.toml
@@ -1,5 +1,5 @@
|
||||
[workspace.package]
|
||||
version = "1.10.2"
|
||||
version = "1.11.3"
|
||||
edition = "2024"
|
||||
rust-version = "1.88"
|
||||
license = "MIT OR Apache-2.0"
|
||||
@@ -83,8 +83,6 @@ members = [
|
||||
"crates/prune/db",
|
||||
"crates/prune/prune",
|
||||
"crates/prune/types",
|
||||
"crates/ress/protocol",
|
||||
"crates/ress/provider",
|
||||
"crates/revm/",
|
||||
"crates/rpc/ipc/",
|
||||
"crates/rpc/rpc-api/",
|
||||
@@ -101,7 +99,6 @@ members = [
|
||||
"crates/stages/api/",
|
||||
"crates/stages/stages/",
|
||||
"crates/stages/types/",
|
||||
"crates/stateless",
|
||||
"crates/static-file/static-file",
|
||||
"crates/static-file/types/",
|
||||
"crates/storage/codecs/",
|
||||
@@ -125,7 +122,6 @@ members = [
|
||||
"crates/trie/db",
|
||||
"crates/trie/parallel/",
|
||||
"crates/trie/sparse",
|
||||
"crates/trie/sparse-parallel/",
|
||||
"crates/trie/trie",
|
||||
"examples/beacon-api-sidecar-fetcher/",
|
||||
"examples/beacon-api-sse/",
|
||||
@@ -310,6 +306,11 @@ inherits = "release"
|
||||
lto = "fat"
|
||||
codegen-units = 1
|
||||
|
||||
[profile.maxperf-symbols]
|
||||
inherits = "maxperf"
|
||||
debug = "full"
|
||||
strip = "none"
|
||||
|
||||
[profile.reproducible]
|
||||
inherits = "release"
|
||||
panic = "abort"
|
||||
@@ -418,7 +419,6 @@ reth-rpc-convert = { path = "crates/rpc/rpc-convert" }
|
||||
reth-stages = { path = "crates/stages/stages" }
|
||||
reth-stages-api = { path = "crates/stages/api" }
|
||||
reth-stages-types = { path = "crates/stages/types", default-features = false }
|
||||
reth-stateless = { path = "crates/stateless", default-features = false }
|
||||
reth-static-file = { path = "crates/static-file/static-file" }
|
||||
reth-static-file-types = { path = "crates/static-file/types", default-features = false }
|
||||
reth-storage-api = { path = "crates/storage/storage-api", default-features = false }
|
||||
@@ -434,10 +434,7 @@ reth-trie-common = { path = "crates/trie/common", default-features = false }
|
||||
reth-trie-db = { path = "crates/trie/db" }
|
||||
reth-trie-parallel = { path = "crates/trie/parallel" }
|
||||
reth-trie-sparse = { path = "crates/trie/sparse", default-features = false }
|
||||
reth-trie-sparse-parallel = { path = "crates/trie/sparse-parallel" }
|
||||
reth-zstd-compressors = { path = "crates/storage/zstd-compressors", default-features = false }
|
||||
reth-ress-protocol = { path = "crates/ress/protocol" }
|
||||
reth-ress-provider = { path = "crates/ress/provider" }
|
||||
|
||||
# revm
|
||||
revm = { version = "34.0.0", default-features = false }
|
||||
@@ -451,46 +448,57 @@ op-revm = { version = "15.0.0", default-features = false }
|
||||
revm-inspectors = "0.34.2"
|
||||
|
||||
# eth
|
||||
alloy-dyn-abi = "1.5.4"
|
||||
alloy-primitives = { version = "1.5.4", default-features = false, features = ["map-foldhash"] }
|
||||
alloy-sol-types = { version = "1.5.4", default-features = false }
|
||||
alloy-dyn-abi = "1.5.6"
|
||||
alloy-primitives = { version = "1.5.6", default-features = false, features = [
|
||||
"map-foldhash",
|
||||
] }
|
||||
alloy-sol-types = { version = "1.5.6", default-features = false }
|
||||
|
||||
alloy-chains = { version = "0.2.5", default-features = false }
|
||||
alloy-eip2124 = { version = "0.2.0", default-features = false }
|
||||
alloy-eip7928 = { version = "0.3.0", default-features = false }
|
||||
alloy-evm = { version = "0.27.2", default-features = false }
|
||||
alloy-rlp = { version = "0.3.10", default-features = false, features = ["core-net"] }
|
||||
alloy-rlp = { version = "0.3.13", default-features = false, features = [
|
||||
"core-net",
|
||||
] }
|
||||
alloy-trie = { version = "0.9.4", default-features = false }
|
||||
|
||||
alloy-hardforks = "0.4.5"
|
||||
|
||||
alloy-consensus = { version = "1.6.2", default-features = false }
|
||||
alloy-contract = { version = "1.6.2", default-features = false }
|
||||
alloy-eips = { version = "1.6.2", default-features = false }
|
||||
alloy-genesis = { version = "1.6.2", default-features = false }
|
||||
alloy-json-rpc = { version = "1.6.2", default-features = false }
|
||||
alloy-network = { version = "1.6.2", default-features = false }
|
||||
alloy-network-primitives = { version = "1.6.2", default-features = false }
|
||||
alloy-provider = { version = "1.6.2", features = ["reqwest", "debug-api"], default-features = false }
|
||||
alloy-pubsub = { version = "1.6.2", default-features = false }
|
||||
alloy-rpc-client = { version = "1.6.2", default-features = false }
|
||||
alloy-rpc-types = { version = "1.6.2", features = ["eth"], default-features = false }
|
||||
alloy-rpc-types-admin = { version = "1.6.2", default-features = false }
|
||||
alloy-rpc-types-anvil = { version = "1.6.2", default-features = false }
|
||||
alloy-rpc-types-beacon = { version = "1.6.2", default-features = false }
|
||||
alloy-rpc-types-debug = { version = "1.6.2", default-features = false }
|
||||
alloy-rpc-types-engine = { version = "1.6.2", default-features = false }
|
||||
alloy-rpc-types-eth = { version = "1.6.2", default-features = false }
|
||||
alloy-rpc-types-mev = { version = "1.6.2", default-features = false }
|
||||
alloy-rpc-types-trace = { version = "1.6.2", default-features = false }
|
||||
alloy-rpc-types-txpool = { version = "1.6.2", default-features = false }
|
||||
alloy-serde = { version = "1.6.2", default-features = false }
|
||||
alloy-signer = { version = "1.6.2", default-features = false }
|
||||
alloy-signer-local = { version = "1.6.2", default-features = false }
|
||||
alloy-transport = { version = "1.6.2" }
|
||||
alloy-transport-http = { version = "1.6.2", features = ["reqwest-rustls-tls"], default-features = false }
|
||||
alloy-transport-ipc = { version = "1.6.2", default-features = false }
|
||||
alloy-transport-ws = { version = "1.6.2", default-features = false }
|
||||
alloy-consensus = { version = "1.6.3", default-features = false }
|
||||
alloy-contract = { version = "1.6.3", default-features = false }
|
||||
alloy-eips = { version = "1.6.3", default-features = false }
|
||||
alloy-genesis = { version = "1.6.3", default-features = false }
|
||||
alloy-json-rpc = { version = "1.6.3", default-features = false }
|
||||
alloy-network = { version = "1.6.3", default-features = false }
|
||||
alloy-network-primitives = { version = "1.6.3", default-features = false }
|
||||
alloy-provider = { version = "1.6.3", features = [
|
||||
"reqwest",
|
||||
"debug-api",
|
||||
], default-features = false }
|
||||
alloy-pubsub = { version = "1.6.3", default-features = false }
|
||||
alloy-rpc-client = { version = "1.6.3", default-features = false }
|
||||
alloy-rpc-types = { version = "1.6.3", features = [
|
||||
"eth",
|
||||
], default-features = false }
|
||||
alloy-rpc-types-admin = { version = "1.6.3", default-features = false }
|
||||
alloy-rpc-types-anvil = { version = "1.6.3", default-features = false }
|
||||
alloy-rpc-types-beacon = { version = "1.6.3", default-features = false }
|
||||
alloy-rpc-types-debug = { version = "1.6.3", default-features = false }
|
||||
alloy-rpc-types-engine = { version = "1.6.3", default-features = false }
|
||||
alloy-rpc-types-eth = { version = "1.6.3", default-features = false }
|
||||
alloy-rpc-types-mev = { version = "1.6.3", default-features = false }
|
||||
alloy-rpc-types-trace = { version = "1.6.3", default-features = false }
|
||||
alloy-rpc-types-txpool = { version = "1.6.3", default-features = false }
|
||||
alloy-serde = { version = "1.6.3", default-features = false }
|
||||
alloy-signer = { version = "1.6.3", default-features = false }
|
||||
alloy-signer-local = { version = "1.6.3", default-features = false }
|
||||
alloy-transport = { version = "1.6.3" }
|
||||
alloy-transport-http = { version = "1.6.3", features = [
|
||||
"reqwest-rustls-tls",
|
||||
], default-features = false }
|
||||
alloy-transport-ipc = { version = "1.6.3", default-features = false }
|
||||
alloy-transport-ws = { version = "1.6.3", default-features = false }
|
||||
|
||||
# op
|
||||
alloy-op-evm = { version = "0.27.2", default-features = false }
|
||||
@@ -507,7 +515,10 @@ either = { version = "1.15.0", default-features = false }
|
||||
arrayvec = { version = "0.7.6", default-features = false }
|
||||
aquamarine = "0.6"
|
||||
auto_impl = "1"
|
||||
backon = { version = "1.2", default-features = false, features = ["std-blocking-sleep", "tokio-sleep"] }
|
||||
backon = { version = "1.2", default-features = false, features = [
|
||||
"std-blocking-sleep",
|
||||
"tokio-sleep",
|
||||
] }
|
||||
bincode = "1.3"
|
||||
bitflags = "2.4"
|
||||
boyer-moore-magiclen = "0.2.16"
|
||||
@@ -529,9 +540,13 @@ itertools = { version = "0.14", default-features = false }
|
||||
linked_hash_set = "0.1"
|
||||
lz4 = "1.28.1"
|
||||
modular-bitfield = "0.13.1"
|
||||
notify = { version = "8.0.0", default-features = false, features = ["macos_fsevent"] }
|
||||
notify = { version = "8.0.0", default-features = false, features = [
|
||||
"macos_fsevent",
|
||||
] }
|
||||
nybbles = { version = "0.4.8", default-features = false }
|
||||
once_cell = { version = "1.19", default-features = false, features = ["critical-section"] }
|
||||
once_cell = { version = "1.19", default-features = false, features = [
|
||||
"critical-section",
|
||||
] }
|
||||
parking_lot = "0.12"
|
||||
paste = "1.0"
|
||||
rand = "0.9"
|
||||
@@ -550,7 +565,9 @@ strum_macros = "0.27"
|
||||
syn = "2.0"
|
||||
thiserror = { version = "2.0.0", default-features = false }
|
||||
tar = "0.4.44"
|
||||
tracing = { version = "0.1.0", default-features = false }
|
||||
tracing = { version = "0.1.0", default-features = false, features = [
|
||||
"attributes",
|
||||
] }
|
||||
tracing-appender = "0.2"
|
||||
url = { version = "2.3", default-features = false }
|
||||
zstd = "0.13"
|
||||
@@ -588,7 +605,11 @@ futures-util = { version = "0.3", default-features = false }
|
||||
hyper = "1.3"
|
||||
hyper-util = "0.1.5"
|
||||
pin-project = "1.0.12"
|
||||
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "rustls-tls-native-roots", "stream"] }
|
||||
reqwest = { version = "0.12", default-features = false, features = [
|
||||
"rustls-tls",
|
||||
"rustls-tls-native-roots",
|
||||
"stream",
|
||||
] }
|
||||
tracing-futures = "0.2"
|
||||
tower = "0.5"
|
||||
tower-http = "0.6"
|
||||
@@ -613,7 +634,10 @@ proptest-arbitrary-interop = "0.1.0"
|
||||
# crypto
|
||||
enr = { version = "0.13", default-features = false }
|
||||
k256 = { version = "0.13", default-features = false, features = ["ecdsa"] }
|
||||
secp256k1 = { version = "0.30", default-features = false, features = ["global-context", "recovery"] }
|
||||
secp256k1 = { version = "0.30", default-features = false, features = [
|
||||
"global-context",
|
||||
"recovery",
|
||||
] }
|
||||
# rand 8 for secp256k1
|
||||
rand_08 = { package = "rand", version = "0.8" }
|
||||
|
||||
|
||||
@@ -51,14 +51,15 @@ RUN --mount=type=secret,id=DEPOT_TOKEN,env=SCCACHE_WEBDAV_TOKEN \
|
||||
--mount=type=cache,target=/usr/local/cargo/registry,sharing=shared \
|
||||
--mount=type=cache,target=/usr/local/cargo/git,sharing=shared \
|
||||
--mount=type=cache,target=$SCCACHE_DIR,sharing=shared \
|
||||
export RUSTC_WRAPPER=sccache SCCACHE_WEBDAV_ENDPOINT=https://cache.depot.dev SCCACHE_DIR=/sccache && \
|
||||
sccache --start-server && \
|
||||
if [ -n "$RUSTFLAGS" ]; then \
|
||||
export RUSTFLAGS="$RUSTFLAGS"; \
|
||||
elif [ "$TARGETPLATFORM" = "linux/amd64" ]; then \
|
||||
export RUSTFLAGS="-C target-cpu=x86-64-v3 -C target-feature=+pclmulqdq"; \
|
||||
fi && \
|
||||
cargo build --profile $BUILD_PROFILE --features "$FEATURES" --locked --bin $BINARY --manifest-path $MANIFEST_PATH/Cargo.toml
|
||||
|
||||
RUN sccache --show-stats || true
|
||||
cargo build --profile $BUILD_PROFILE --features "$FEATURES" --locked --bin $BINARY --manifest-path $MANIFEST_PATH/Cargo.toml && \
|
||||
sccache --show-stats
|
||||
|
||||
# Copy binary to a known location (ARG not resolved in COPY)
|
||||
# Note: Custom profiles like maxperf/profiling output to target/<profile>/, not target/release/
|
||||
|
||||
18
Makefile
18
Makefile
@@ -12,12 +12,7 @@ FULL_DB_TOOLS_DIR := $(shell pwd)/$(DB_TOOLS_DIR)/
|
||||
CARGO_TARGET_DIR ?= target
|
||||
|
||||
# List of features to use when building. Can be overridden via the environment.
|
||||
# No jemalloc on Windows
|
||||
ifeq ($(OS),Windows_NT)
|
||||
FEATURES ?= asm-keccak min-debug-logs
|
||||
else
|
||||
FEATURES ?= jemalloc asm-keccak min-debug-logs
|
||||
endif
|
||||
FEATURES ?=
|
||||
|
||||
# Cargo profile for builds. Default is for local builds, CI uses an override.
|
||||
PROFILE ?= release
|
||||
@@ -158,7 +153,7 @@ COV_FILE := lcov.info
|
||||
.PHONY: test-unit
|
||||
test-unit: ## Run unit tests.
|
||||
cargo install cargo-nextest --locked
|
||||
cargo nextest run $(UNIT_TEST_ARGS)
|
||||
cargo nextest run --no-fail-fast $(UNIT_TEST_ARGS)
|
||||
|
||||
|
||||
.PHONY: cov-unit
|
||||
@@ -191,7 +186,7 @@ $(EEST_TESTS_DIR):
|
||||
|
||||
.PHONY: ef-tests
|
||||
ef-tests: $(EF_TESTS_DIR) $(EEST_TESTS_DIR) ## Runs Legacy and EEST tests.
|
||||
cargo nextest run -p ef-tests --release --features ef-tests
|
||||
cargo nextest run --no-fail-fast -p ef-tests --release --features ef-tests
|
||||
|
||||
##@ reth-bench
|
||||
|
||||
@@ -238,16 +233,15 @@ update-book-cli: build-debug ## Update book cli documentation.
|
||||
|
||||
.PHONY: profiling
|
||||
profiling: ## Builds `reth` with optimisations, but also symbols.
|
||||
RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling --features jemalloc,asm-keccak
|
||||
RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling
|
||||
|
||||
.PHONY: maxperf
|
||||
maxperf: ## Builds `reth` with the most aggressive optimisations.
|
||||
RUSTFLAGS="-C target-cpu=native" cargo build --profile maxperf --features jemalloc,asm-keccak
|
||||
RUSTFLAGS="-C target-cpu=native" cargo build --profile maxperf
|
||||
|
||||
.PHONY: maxperf-no-asm
|
||||
maxperf-no-asm: ## Builds `reth` with the most aggressive optimisations, minus the "asm-keccak" feature.
|
||||
RUSTFLAGS="-C target-cpu=native" cargo build --profile maxperf --features jemalloc
|
||||
|
||||
RUSTFLAGS="-C target-cpu=native" cargo build --profile maxperf --no-default-features --features jemalloc,min-debug-logs,otlp,otlp-logs,reth-revm/portable,js-tracer,keccak-cache-global,rocksdb
|
||||
|
||||
fmt:
|
||||
cargo +nightly fmt
|
||||
|
||||
@@ -30,7 +30,7 @@ reth-bench-compare \
|
||||
| `--draw` | Generate charts (needs Python/uv) | `false` | No |
|
||||
| `--profile` | Enable CPU profiling (needs samply) | `false` | No |
|
||||
| `-vvvv` | Debug logging | Info | No |
|
||||
| `--features <FEATURES>` | Rust features for both builds | `jemalloc,asm-keccak` | No |
|
||||
| `--features <FEATURES>` | Extra Rust features for both builds | - | No |
|
||||
| `--rustflags <FLAGS>` | RUSTFLAGS for both builds | `-C target-cpu=native` | No |
|
||||
| `--baseline-features <FEATURES>` | Features for baseline only | Inherits `--features` | No |
|
||||
| `--feature-features <FEATURES>` | Features for feature only | Inherits `--features` | No |
|
||||
|
||||
@@ -191,10 +191,9 @@ pub(crate) struct Args {
|
||||
#[arg(trailing_var_arg = true, allow_hyphen_values = true)]
|
||||
pub reth_args: Vec<String>,
|
||||
|
||||
/// Comma-separated list of features to enable during reth compilation (applied to both builds)
|
||||
///
|
||||
/// Example: `jemalloc,asm-keccak`
|
||||
#[arg(long, value_name = "FEATURES", default_value = "jemalloc,asm-keccak")]
|
||||
/// Comma-separated list of extra features to enable during reth compilation (applied to both
|
||||
/// builds)
|
||||
#[arg(long, value_name = "FEATURES", default_value = "")]
|
||||
pub features: String,
|
||||
|
||||
/// Comma-separated list of features to enable only for baseline build (overrides --features)
|
||||
@@ -205,7 +204,7 @@ pub(crate) struct Args {
|
||||
|
||||
/// Comma-separated list of features to enable only for feature build (overrides --features)
|
||||
///
|
||||
/// Example: `--feature-features jemalloc,asm-keccak`
|
||||
/// Example: `--feature-features jemalloc-prof`
|
||||
#[arg(long, value_name = "FEATURES")]
|
||||
pub feature_features: Option<String>,
|
||||
|
||||
@@ -277,10 +276,8 @@ impl Args {
|
||||
/// Get the default RPC URL for a given chain
|
||||
const fn get_default_rpc_url(chain: &Chain) -> &'static str {
|
||||
match chain.id() {
|
||||
8453 => "https://base.reth.rs/rpc", // base
|
||||
84532 => "https://base-sepolia.rpc.ithaca.xyz", // base-sepolia
|
||||
27082 => "https://rpc.hoodi.ethpandaops.io", // hoodi
|
||||
_ => "https://ethereum.reth.rs/rpc", // mainnet and fallback
|
||||
27082 => "https://rpc.hoodi.ethpandaops.io", // hoodi
|
||||
_ => "https://ethereum.reth.rs/rpc", // mainnet and fallback
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -32,7 +32,7 @@ Otherwise, running `make maxperf` at the root of the repo should be sufficient f
|
||||
The `reth-bench new-payload-fcu` command is the most representative of ethereum mainnet live sync, alternating between sending `engine_newPayload` calls and `engine_forkchoiceUpdated` calls.
|
||||
|
||||
The `new-payload-fcu` command supports two optional waiting modes that can be used together or independently:
|
||||
- `--wait-time <duration>`: Fixed sleep interval between blocks (e.g., `--wait-time 100ms`)
|
||||
- `--wait-time <duration>`: Fixed sleep interval between blocks (e.g., `--wait-time 100ms` or `--wait-time 400` for 400ms)
|
||||
- `--wait-for-persistence`: Waits for blocks to be persisted using the `reth_subscribePersistedBlock` subscription
|
||||
|
||||
When using `--wait-for-persistence`, the benchmark waits after every `(threshold + 1)` blocks, where the threshold defaults to the engine's persistence threshold (2). This can be customized with `--persistence-threshold <N>`.
|
||||
@@ -73,7 +73,7 @@ make profiling
|
||||
|
||||
If the purpose of the benchmark is to obtain `jemalloc` memory profiles that can then be analyzed by `jeprof`, it should be compiled with the `profiling` profile and the `jemalloc-prof` feature:
|
||||
```bash
|
||||
RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling --features "jemalloc-prof,asm-keccak"
|
||||
RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling --features "jemalloc-prof"
|
||||
```
|
||||
|
||||
> [!NOTE]
|
||||
@@ -82,7 +82,7 @@ RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling --features "jem
|
||||
Finally, if the purpose of the benchmark is to profile the node when `snmalloc` is configured as the default allocator, it would be built with the following
|
||||
command:
|
||||
```bash
|
||||
RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling --no-default-features --features "snmalloc-native,asm-keccak"
|
||||
RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling --no-default-features --features "snmalloc-native,asm-keccak,min-debug-logs"
|
||||
```
|
||||
|
||||
### Run the Benchmark:
|
||||
|
||||
@@ -192,6 +192,15 @@ impl Command {
|
||||
parent_header = block.header;
|
||||
parent_hash = block_hash;
|
||||
blocks_processed += 1;
|
||||
|
||||
let progress = match mode {
|
||||
RampMode::Blocks(total) => format!("{blocks_processed}/{total}"),
|
||||
RampMode::TargetGasLimit(target) => {
|
||||
let pct = (parent_header.gas_limit as f64 / target as f64 * 100.0).min(100.0);
|
||||
format!("{pct:.1}%")
|
||||
}
|
||||
};
|
||||
info!(target: "reth-bench", progress, block_number = parent_header.number, gas_limit = parent_header.gas_limit, "Block processed");
|
||||
}
|
||||
|
||||
let final_gas_limit = parent_header.gas_limit;
|
||||
|
||||
@@ -2,7 +2,10 @@
|
||||
|
||||
use crate::valid_payload::call_forkchoice_updated;
|
||||
use eyre::Result;
|
||||
use std::io::{BufReader, Read};
|
||||
use std::{
|
||||
io::{BufReader, Read},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
/// Read input from either a file path or stdin.
|
||||
pub(crate) fn read_input(path: Option<&str>) -> Result<String> {
|
||||
@@ -51,6 +54,22 @@ pub(crate) fn parse_gas_limit(s: &str) -> eyre::Result<u64> {
|
||||
let base: u64 = num_str.trim().parse()?;
|
||||
base.checked_mul(multiplier).ok_or_else(|| eyre::eyre!("value overflow"))
|
||||
}
|
||||
|
||||
/// Parses a duration string, treating bare integers as milliseconds.
|
||||
///
|
||||
/// Accepts either a `humantime` duration string (e.g. `"100ms"`, `"2s"`) or a plain
|
||||
/// integer which is interpreted as milliseconds (e.g. `"400"` → 400ms).
|
||||
pub(crate) fn parse_duration(s: &str) -> eyre::Result<Duration> {
|
||||
match humantime::parse_duration(s) {
|
||||
Ok(d) => Ok(d),
|
||||
Err(_) => {
|
||||
let millis: u64 =
|
||||
s.trim().parse().map_err(|_| eyre::eyre!("invalid duration: {s:?}"))?;
|
||||
Ok(Duration::from_millis(millis))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
use alloy_consensus::Header;
|
||||
use alloy_eips::eip4844::kzg_to_versioned_hash;
|
||||
use alloy_primitives::{Address, B256};
|
||||
@@ -270,4 +289,24 @@ mod tests {
|
||||
assert!(parse_gas_limit("G").is_err());
|
||||
assert!(parse_gas_limit("-1G").is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_duration_with_unit() {
|
||||
assert_eq!(parse_duration("100ms").unwrap(), Duration::from_millis(100));
|
||||
assert_eq!(parse_duration("2s").unwrap(), Duration::from_secs(2));
|
||||
assert_eq!(parse_duration("1m").unwrap(), Duration::from_secs(60));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_duration_bare_millis() {
|
||||
assert_eq!(parse_duration("400").unwrap(), Duration::from_millis(400));
|
||||
assert_eq!(parse_duration("0").unwrap(), Duration::from_millis(0));
|
||||
assert_eq!(parse_duration("1000").unwrap(), Duration::from_millis(1000));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_duration_errors() {
|
||||
assert!(parse_duration("abc").is_err());
|
||||
assert!(parse_duration("").is_err());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
use crate::{
|
||||
bench::{
|
||||
context::BenchContext,
|
||||
helpers::parse_duration,
|
||||
output::{
|
||||
write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
|
||||
},
|
||||
@@ -25,7 +26,6 @@ use alloy_provider::Provider;
|
||||
use alloy_rpc_types_engine::ForkchoiceState;
|
||||
use clap::Parser;
|
||||
use eyre::{Context, OptionExt};
|
||||
use humantime::parse_duration;
|
||||
use reth_cli_runner::CliContext;
|
||||
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
|
||||
use reth_node_core::args::BenchmarkArgs;
|
||||
@@ -40,6 +40,9 @@ pub struct Command {
|
||||
rpc_url: String,
|
||||
|
||||
/// How long to wait after a forkchoice update before sending the next payload.
|
||||
///
|
||||
/// Accepts a duration string (e.g. `100ms`, `2s`) or a bare integer treated as
|
||||
/// milliseconds (e.g. `400`).
|
||||
#[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
|
||||
wait_time: Option<Duration>,
|
||||
|
||||
@@ -117,7 +120,7 @@ impl Command {
|
||||
self.benchmark.ws_rpc_url.as_deref(),
|
||||
&self.benchmark.engine_rpc_url,
|
||||
)?;
|
||||
let sub = setup_persistence_subscription(ws_url).await?;
|
||||
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
|
||||
Some(PersistenceWaiter::with_duration_and_subscription(
|
||||
duration,
|
||||
sub,
|
||||
@@ -131,7 +134,7 @@ impl Command {
|
||||
self.benchmark.ws_rpc_url.as_deref(),
|
||||
&self.benchmark.engine_rpc_url,
|
||||
)?;
|
||||
let sub = setup_persistence_subscription(ws_url).await?;
|
||||
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
|
||||
Some(PersistenceWaiter::with_subscription(
|
||||
sub,
|
||||
self.persistence_threshold,
|
||||
@@ -150,6 +153,7 @@ impl Command {
|
||||
..
|
||||
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
|
||||
|
||||
let total_blocks = benchmark_mode.total_blocks();
|
||||
let buffer_size = self.rpc_block_buffer_size;
|
||||
|
||||
// Use a oneshot channel to propagate errors from the spawned task
|
||||
@@ -203,6 +207,7 @@ impl Command {
|
||||
});
|
||||
|
||||
let mut results = Vec::new();
|
||||
let mut blocks_processed = 0u64;
|
||||
let total_benchmark_duration = Instant::now();
|
||||
let mut total_wait_time = Duration::ZERO;
|
||||
|
||||
@@ -246,8 +251,13 @@ impl Command {
|
||||
|
||||
// Exclude time spent waiting on the block prefetch channel from the benchmark duration.
|
||||
// We want to measure engine throughput, not RPC fetch latency.
|
||||
blocks_processed += 1;
|
||||
let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
|
||||
info!(target: "reth-bench", %combined_result);
|
||||
let progress = match total_blocks {
|
||||
Some(total) => format!("{blocks_processed}/{total}"),
|
||||
None => format!("{blocks_processed}"),
|
||||
};
|
||||
info!(target: "reth-bench", progress, %combined_result);
|
||||
|
||||
if let Some(w) = &mut waiter {
|
||||
w.on_block(block_number).await?;
|
||||
|
||||
@@ -52,6 +52,7 @@ impl Command {
|
||||
..
|
||||
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
|
||||
|
||||
let total_blocks = benchmark_mode.total_blocks();
|
||||
let buffer_size = self.rpc_block_buffer_size;
|
||||
|
||||
// Use a oneshot channel to propagate errors from the spawned task
|
||||
@@ -82,8 +83,8 @@ impl Command {
|
||||
}
|
||||
});
|
||||
|
||||
// put results in a summary vec so they can be printed at the end
|
||||
let mut results = Vec::new();
|
||||
let mut blocks_processed = 0u64;
|
||||
let total_benchmark_duration = Instant::now();
|
||||
let mut total_wait_time = Duration::ZERO;
|
||||
|
||||
@@ -105,7 +106,12 @@ impl Command {
|
||||
call_new_payload(&auth_provider, version, params).await?;
|
||||
|
||||
let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
|
||||
info!(target: "reth-bench", %new_payload_result);
|
||||
blocks_processed += 1;
|
||||
let progress = match total_blocks {
|
||||
Some(total) => format!("{blocks_processed}/{total}"),
|
||||
None => format!("{blocks_processed}"),
|
||||
};
|
||||
info!(target: "reth-bench", progress, %new_payload_result);
|
||||
|
||||
// current duration since the start of the benchmark minus the time
|
||||
// waiting for blocks
|
||||
|
||||
@@ -154,12 +154,18 @@ impl PersistenceSubscription {
|
||||
}
|
||||
|
||||
/// Establishes a websocket connection and subscribes to `reth_subscribePersistedBlock`.
|
||||
///
|
||||
/// The `keepalive_interval` is set to match `persistence_timeout` so that the `WebSocket`
|
||||
/// connection is not dropped during long MDBX commits that block the server from responding
|
||||
/// to pings.
|
||||
pub(crate) async fn setup_persistence_subscription(
|
||||
ws_url: Url,
|
||||
persistence_timeout: Duration,
|
||||
) -> eyre::Result<PersistenceSubscription> {
|
||||
info!(target: "reth-bench", "Connecting to WebSocket at {} for persistence subscription", ws_url);
|
||||
|
||||
let ws_connect = WsConnect::new(ws_url.to_string());
|
||||
let ws_connect =
|
||||
WsConnect::new(ws_url.to_string()).with_keepalive_interval(persistence_timeout);
|
||||
let client = RpcClient::connect_pubsub(ws_connect)
|
||||
.await
|
||||
.wrap_err("Failed to connect to WebSocket RPC endpoint")?;
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
use crate::{
|
||||
authenticated_transport::AuthenticatedTransportConnect,
|
||||
bench::{
|
||||
helpers::parse_duration,
|
||||
output::{
|
||||
write_benchmark_results, CombinedResult, GasRampPayloadFile, NewPayloadResult,
|
||||
TotalGasOutput, TotalGasRow,
|
||||
@@ -30,7 +31,6 @@ use alloy_rpc_client::ClientBuilder;
|
||||
use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV4, ForkchoiceState, JwtSecret};
|
||||
use clap::Parser;
|
||||
use eyre::Context;
|
||||
use humantime::parse_duration;
|
||||
use reth_cli_runner::CliContext;
|
||||
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
|
||||
use reth_node_api::EngineApiMessageVersion;
|
||||
@@ -78,6 +78,9 @@ pub struct Command {
|
||||
output: Option<PathBuf>,
|
||||
|
||||
/// How long to wait after a forkchoice update before sending the next payload.
|
||||
///
|
||||
/// Accepts a duration string (e.g. `100ms`, `2s`) or a bare integer treated as
|
||||
/// milliseconds (e.g. `400`).
|
||||
#[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
|
||||
wait_time: Option<Duration>,
|
||||
|
||||
@@ -166,7 +169,7 @@ impl Command {
|
||||
let mut waiter = match (self.wait_time, self.wait_for_persistence) {
|
||||
(Some(duration), true) => {
|
||||
let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
|
||||
let sub = setup_persistence_subscription(ws_url).await?;
|
||||
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
|
||||
Some(PersistenceWaiter::with_duration_and_subscription(
|
||||
duration,
|
||||
sub,
|
||||
@@ -177,7 +180,7 @@ impl Command {
|
||||
(Some(duration), false) => Some(PersistenceWaiter::with_duration(duration)),
|
||||
(None, true) => {
|
||||
let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
|
||||
let sub = setup_persistence_subscription(ws_url).await?;
|
||||
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
|
||||
Some(PersistenceWaiter::with_subscription(
|
||||
sub,
|
||||
self.persistence_threshold,
|
||||
@@ -338,7 +341,8 @@ impl Command {
|
||||
};
|
||||
|
||||
let current_duration = total_benchmark_duration.elapsed();
|
||||
info!(target: "reth-bench", %combined_result);
|
||||
let progress = format!("{}/{}", i + 1, payloads.len());
|
||||
info!(target: "reth-bench", progress, %combined_result);
|
||||
|
||||
if let Some(w) = &mut waiter {
|
||||
w.on_block(block_number).await?;
|
||||
|
||||
@@ -20,6 +20,19 @@ impl BenchMode {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the total number of blocks in the benchmark, if known.
|
||||
///
|
||||
/// For [`BenchMode::Range`] this is the length of the range.
|
||||
/// For [`BenchMode::Continuous`] the total is unbounded, so `None` is returned.
|
||||
pub const fn total_blocks(&self) -> Option<u64> {
|
||||
match self {
|
||||
Self::Continuous(_) => None,
|
||||
Self::Range(range) => {
|
||||
Some(range.end().saturating_sub(*range.start()).saturating_add(1))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a [`BenchMode`] from optional `from` and `to` fields.
|
||||
pub fn new(from: Option<u64>, to: Option<u64>, latest_block: u64) -> Result<Self, eyre::Error> {
|
||||
// If neither `--from` nor `--to` are provided, we will run the benchmark continuously,
|
||||
|
||||
@@ -33,7 +33,6 @@ reth-chainspec.workspace = true
|
||||
reth-primitives.workspace = true
|
||||
reth-db = { workspace = true, features = ["mdbx"] }
|
||||
reth-provider.workspace = true
|
||||
reth-evm.workspace = true
|
||||
reth-revm.workspace = true
|
||||
reth-transaction-pool.workspace = true
|
||||
reth-cli-runner.workspace = true
|
||||
@@ -53,32 +52,31 @@ reth-payload-primitives.workspace = true
|
||||
reth-node-api.workspace = true
|
||||
reth-node-core.workspace = true
|
||||
reth-ethereum-payload-builder.workspace = true
|
||||
reth-ethereum-primitives.workspace = true
|
||||
reth-node-ethereum.workspace = true
|
||||
reth-node-builder.workspace = true
|
||||
reth-node-metrics.workspace = true
|
||||
reth-consensus.workspace = true
|
||||
reth-tokio-util.workspace = true
|
||||
reth-ress-protocol.workspace = true
|
||||
reth-ress-provider.workspace = true
|
||||
|
||||
# alloy
|
||||
alloy-primitives.workspace = true
|
||||
alloy-rpc-types = { workspace = true, features = ["engine"] }
|
||||
|
||||
# tracing
|
||||
tracing.workspace = true
|
||||
|
||||
# async
|
||||
tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread"] }
|
||||
|
||||
# misc
|
||||
aquamarine.workspace = true
|
||||
clap = { workspace = true, features = ["derive", "env"] }
|
||||
eyre.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
alloy-node-bindings = "1.6.3"
|
||||
alloy-provider = { workspace = true, features = ["reqwest"] }
|
||||
alloy-rpc-types-eth.workspace = true
|
||||
backon.workspace = true
|
||||
serde_json.workspace = true
|
||||
tempfile.workspace = true
|
||||
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
|
||||
toml.workspace = true
|
||||
|
||||
[features]
|
||||
default = [
|
||||
@@ -89,6 +87,7 @@ default = [
|
||||
"js-tracer",
|
||||
"keccak-cache-global",
|
||||
"asm-keccak",
|
||||
"min-debug-logs",
|
||||
"rocksdb",
|
||||
]
|
||||
|
||||
@@ -114,10 +113,12 @@ asm-keccak = [
|
||||
"reth-primitives/asm-keccak",
|
||||
"reth-ethereum-cli/asm-keccak",
|
||||
"reth-node-ethereum/asm-keccak",
|
||||
"alloy-primitives/asm-keccak",
|
||||
]
|
||||
keccak-cache-global = [
|
||||
"reth-node-core/keccak-cache-global",
|
||||
"reth-node-ethereum/keccak-cache-global",
|
||||
"alloy-primitives/keccak-cache-global",
|
||||
]
|
||||
jemalloc = [
|
||||
"reth-cli-util/jemalloc",
|
||||
|
||||
@@ -51,6 +51,9 @@
|
||||
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
|
||||
#![cfg_attr(docsrs, feature(doc_cfg))]
|
||||
|
||||
// Used in feature flags only (`asm-keccak`, `keccak-cache-global`)
|
||||
use alloy_primitives as _;
|
||||
|
||||
pub mod cli;
|
||||
|
||||
/// Re-exported utils.
|
||||
@@ -205,12 +208,9 @@ pub mod rpc {
|
||||
}
|
||||
}
|
||||
|
||||
/// Ress subprotocol installation.
|
||||
pub mod ress;
|
||||
|
||||
// re-export for convenience
|
||||
#[doc(inline)]
|
||||
pub use reth_cli_runner::{tokio_runtime, CliContext, CliRunner};
|
||||
pub use reth_cli_runner::{CliContext, CliRunner};
|
||||
|
||||
// for rendering diagrams
|
||||
use aquamarine as _;
|
||||
@@ -218,3 +218,4 @@ use aquamarine as _;
|
||||
// used in main
|
||||
use clap as _;
|
||||
use reth_cli_util as _;
|
||||
use tracing as _;
|
||||
|
||||
@@ -8,9 +8,8 @@ static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::ne
|
||||
static MALLOC_CONF: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0";
|
||||
|
||||
use clap::Parser;
|
||||
use reth::{args::RessArgs, cli::Cli, ress::install_ress_subprotocol};
|
||||
use reth::cli::Cli;
|
||||
use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
|
||||
use reth_node_builder::NodeHandle;
|
||||
use reth_node_ethereum::EthereumNode;
|
||||
use tracing::info;
|
||||
|
||||
@@ -22,27 +21,12 @@ fn main() {
|
||||
unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
|
||||
}
|
||||
|
||||
if let Err(err) =
|
||||
Cli::<EthereumChainSpecParser, RessArgs>::parse().run(async move |builder, ress_args| {
|
||||
info!(target: "reth::cli", "Launching node");
|
||||
let NodeHandle { node, node_exit_future } =
|
||||
builder.node(EthereumNode::default()).launch_with_debug_capabilities().await?;
|
||||
if let Err(err) = Cli::<EthereumChainSpecParser>::parse().run(async move |builder, _| {
|
||||
info!(target: "reth::cli", "Launching node");
|
||||
let handle = builder.node(EthereumNode::default()).launch_with_debug_capabilities().await?;
|
||||
|
||||
// Install ress subprotocol.
|
||||
if ress_args.enabled {
|
||||
install_ress_subprotocol(
|
||||
ress_args,
|
||||
node.provider,
|
||||
node.evm_config,
|
||||
node.network,
|
||||
node.task_executor,
|
||||
node.add_ons_handle.engine_events.new_listener(),
|
||||
)?;
|
||||
}
|
||||
|
||||
node_exit_future.await
|
||||
})
|
||||
{
|
||||
handle.wait_for_node_exit().await
|
||||
}) {
|
||||
eprintln!("Error: {err:?}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
@@ -1,67 +0,0 @@
|
||||
use reth_ethereum_primitives::EthPrimitives;
|
||||
use reth_evm::ConfigureEvm;
|
||||
use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols};
|
||||
use reth_network_api::FullNetwork;
|
||||
use reth_node_api::ConsensusEngineEvent;
|
||||
use reth_node_core::args::RessArgs;
|
||||
use reth_provider::providers::{BlockchainProvider, ProviderNodeTypes};
|
||||
use reth_ress_protocol::{NodeType, ProtocolState, RessProtocolHandler};
|
||||
use reth_ress_provider::{maintain_pending_state, PendingState, RethRessProtocolProvider};
|
||||
use reth_tasks::TaskExecutor;
|
||||
use reth_tokio_util::EventStream;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::*;
|
||||
|
||||
/// Install `ress` subprotocol if it's enabled.
|
||||
pub fn install_ress_subprotocol<P, E, N>(
|
||||
args: RessArgs,
|
||||
provider: BlockchainProvider<P>,
|
||||
evm_config: E,
|
||||
network: N,
|
||||
task_executor: TaskExecutor,
|
||||
engine_events: EventStream<ConsensusEngineEvent<EthPrimitives>>,
|
||||
) -> eyre::Result<()>
|
||||
where
|
||||
P: ProviderNodeTypes<Primitives = EthPrimitives>,
|
||||
E: ConfigureEvm<Primitives = EthPrimitives> + Clone + 'static,
|
||||
N: FullNetwork + NetworkProtocols,
|
||||
{
|
||||
info!(target: "reth::cli", "Installing ress subprotocol");
|
||||
let pending_state = PendingState::default();
|
||||
|
||||
// Spawn maintenance task for pending state.
|
||||
task_executor.spawn(maintain_pending_state(
|
||||
engine_events,
|
||||
provider.clone(),
|
||||
pending_state.clone(),
|
||||
));
|
||||
|
||||
let (tx, mut rx) = mpsc::unbounded_channel();
|
||||
let provider = RethRessProtocolProvider::new(
|
||||
provider,
|
||||
evm_config,
|
||||
Box::new(task_executor.clone()),
|
||||
args.max_witness_window,
|
||||
args.witness_max_parallel,
|
||||
args.witness_cache_size,
|
||||
pending_state,
|
||||
)?;
|
||||
network.add_rlpx_sub_protocol(
|
||||
RessProtocolHandler {
|
||||
provider,
|
||||
node_type: NodeType::Stateful,
|
||||
peers_handle: network.peers_handle().clone(),
|
||||
max_active_connections: args.max_active_connections,
|
||||
state: ProtocolState::new(tx),
|
||||
}
|
||||
.into_rlpx_sub_protocol(),
|
||||
);
|
||||
info!(target: "reth::cli", "Ress subprotocol support enabled");
|
||||
|
||||
task_executor.spawn(async move {
|
||||
while let Some(event) = rx.recv().await {
|
||||
trace!(target: "reth::ress", ?event, "Received ress event");
|
||||
}
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
255
bin/reth/tests/it/main.rs
Normal file
255
bin/reth/tests/it/main.rs
Normal file
@@ -0,0 +1,255 @@
|
||||
#![allow(missing_docs)]
|
||||
|
||||
use std::process::Command;
|
||||
|
||||
const RETH: &str = env!("CARGO_BIN_EXE_reth");
|
||||
|
||||
// ── Helpers ──────────────────────────────────────────────────────────────────
|
||||
|
||||
/// Runs `reth <args>` and returns stdout, asserting exit code 0.
|
||||
///
|
||||
/// Tracing is suppressed via `RUST_LOG=off` so that log lines emitted during
|
||||
/// binary startup don't pollute stdout-based assertions.
|
||||
#[track_caller]
|
||||
fn reth_ok(args: &[&str]) -> String {
|
||||
let output = Command::new(RETH).env("RUST_LOG", "off").args(args).output().unwrap();
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
assert!(output.status.success(), "args {args:?} failed.\nstdout: {stdout}\nstderr: {stderr}");
|
||||
stdout.into_owned()
|
||||
}
|
||||
|
||||
/// Spawns an isolated dev-mode reth node.
|
||||
///
|
||||
/// Discovery is disabled and peer limits are zeroed so the node is fully
|
||||
/// isolated. Each call gets a unique temporary data directory so that
|
||||
/// concurrent test runs never collide on the default `reth/dev/` path.
|
||||
fn spawn_dev() -> (alloy_node_bindings::RethInstance, tempfile::TempDir) {
|
||||
use alloy_node_bindings::Reth;
|
||||
|
||||
let datadir = tempfile::tempdir().expect("failed to create temp dir");
|
||||
|
||||
let instance = Reth::at(RETH)
|
||||
.dev()
|
||||
.disable_discovery()
|
||||
.data_dir(datadir.path())
|
||||
.args(["--max-outbound-peers", "0", "--max-inbound-peers", "0"])
|
||||
.spawn();
|
||||
|
||||
// Return the TempDir alongside the instance so it lives as long as the node.
|
||||
(instance, datadir)
|
||||
}
|
||||
|
||||
// ── Original tests (from PR #22069) ──────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn help() {
|
||||
let stdout = reth_ok(&["--help"]);
|
||||
assert!(stdout.contains("Usage"), "stdout: {stdout}");
|
||||
assert!(stdout.contains("node"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn version() {
|
||||
let stdout = reth_ok(&["--version"]);
|
||||
assert!(stdout.to_lowercase().contains("reth"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn node_help() {
|
||||
let stdout = reth_ok(&["node", "--help"]);
|
||||
assert!(stdout.contains("--dev"), "stdout: {stdout}");
|
||||
assert!(stdout.contains("--http"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unknown_subcommand() {
|
||||
let output = Command::new(RETH).arg("definitely-not-a-cmd").output().unwrap();
|
||||
assert!(!output.status.success());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unknown_flag() {
|
||||
let output = Command::new(RETH).args(["node", "--no-such-flag"]).output().unwrap();
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
assert!(!output.status.success());
|
||||
assert!(stderr.contains("--no-such-flag"), "stderr: {stderr}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn dev_node_eth_syncing() {
|
||||
use alloy_provider::{Provider, ProviderBuilder};
|
||||
|
||||
let (reth, _datadir) = spawn_dev();
|
||||
let provider = ProviderBuilder::new().connect_http(reth.endpoint().parse().unwrap());
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
|
||||
let _syncing = provider.syncing().await.expect("eth_syncing failed");
|
||||
}
|
||||
|
||||
// ── Subcommand --help coverage ───────────────────────────────────────────────
|
||||
//
|
||||
// Every registered subcommand must produce valid --help output. This catches
|
||||
// clap wiring regressions (e.g. a missing field, a conflicting arg name, or a
|
||||
// broken `help_message()` call) that would otherwise only surface when a user
|
||||
// runs the command.
|
||||
|
||||
#[test]
|
||||
fn init_help() {
|
||||
let stdout = reth_ok(&["init", "--help"]);
|
||||
assert!(stdout.contains("--chain"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn init_state_help() {
|
||||
let stdout = reth_ok(&["init-state", "--help"]);
|
||||
assert!(stdout.contains("--chain"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn import_help() {
|
||||
let stdout = reth_ok(&["import", "--help"]);
|
||||
assert!(stdout.contains("--chain"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn import_era_help() {
|
||||
let stdout = reth_ok(&["import-era", "--help"]);
|
||||
assert!(stdout.contains("--chain"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn export_era_help() {
|
||||
let stdout = reth_ok(&["export-era", "--help"]);
|
||||
assert!(stdout.contains("--chain"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn dump_genesis_help() {
|
||||
let stdout = reth_ok(&["dump-genesis", "--help"]);
|
||||
assert!(stdout.contains("--chain"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn db_help() {
|
||||
let stdout = reth_ok(&["db", "--help"]);
|
||||
assert!(stdout.contains("stats"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stage_help() {
|
||||
let stdout = reth_ok(&["stage", "--help"]);
|
||||
assert!(stdout.contains("run"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn p2p_help() {
|
||||
let stdout = reth_ok(&["p2p", "--help"]);
|
||||
assert!(stdout.contains("header"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn config_help() {
|
||||
let stdout = reth_ok(&["config", "--help"]);
|
||||
assert!(stdout.contains("--default"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn prune_help() {
|
||||
let stdout = reth_ok(&["prune", "--help"]);
|
||||
assert!(stdout.contains("--chain"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn download_help() {
|
||||
let stdout = reth_ok(&["download", "--help"]);
|
||||
assert!(stdout.contains("--chain"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn re_execute_help() {
|
||||
let stdout = reth_ok(&["re-execute", "--help"]);
|
||||
assert!(stdout.contains("--chain"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
// ── `config --default` outputs valid TOML ────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn config_default_valid_toml() {
|
||||
let stdout = reth_ok(&["config", "--default"]);
|
||||
|
||||
let parsed: toml::Value =
|
||||
toml::from_str(&stdout).expect("config --default did not produce valid TOML");
|
||||
|
||||
// The default config must contain the [stages] table — this is the heart of
|
||||
// the pipeline configuration and its absence would indicate a serialization
|
||||
// regression.
|
||||
assert!(parsed.get("stages").is_some(), "missing [stages] in config output");
|
||||
}
|
||||
|
||||
// ── `dump-genesis` outputs valid JSON ────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn dump_genesis_mainnet_valid_json() {
|
||||
let stdout = reth_ok(&["dump-genesis"]);
|
||||
|
||||
let genesis: serde_json::Value =
|
||||
serde_json::from_str(&stdout).expect("dump-genesis did not produce valid JSON");
|
||||
|
||||
assert!(genesis.get("nonce").is_some(), "missing nonce in genesis JSON");
|
||||
assert!(genesis.get("alloc").is_some(), "missing alloc in genesis JSON");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn dump_genesis_sepolia_valid_json() {
|
||||
let stdout = reth_ok(&["dump-genesis", "--chain", "sepolia"]);
|
||||
|
||||
let genesis: serde_json::Value = serde_json::from_str(&stdout)
|
||||
.expect("dump-genesis --chain sepolia did not produce valid JSON");
|
||||
|
||||
assert!(genesis.get("alloc").is_some(), "missing alloc in sepolia genesis JSON");
|
||||
}
|
||||
|
||||
// ── Dev node: send transaction round-trip ────────────────────────────────────
|
||||
//
|
||||
// Exercises the full pipeline: RPC submission → mempool → sealing → execution →
|
||||
// receipt retrieval. Uses the pre-funded dev account so no genesis customization
|
||||
// is required.
|
||||
|
||||
#[tokio::test]
|
||||
async fn dev_node_send_tx_and_mine() {
|
||||
use alloy_primitives::{Address, U256};
|
||||
use alloy_provider::{Provider, ProviderBuilder};
|
||||
use alloy_rpc_types_eth::TransactionRequest;
|
||||
|
||||
let (reth, _datadir) = spawn_dev();
|
||||
let provider = ProviderBuilder::new().connect_http(reth.endpoint().parse().unwrap());
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
|
||||
// Dev mode pre-funds the first dev account.
|
||||
let accounts = provider.get_accounts().await.expect("eth_accounts failed");
|
||||
assert!(!accounts.is_empty(), "dev node should expose at least one account");
|
||||
|
||||
let sender = accounts[0];
|
||||
let recipient = Address::with_last_byte(0x42);
|
||||
|
||||
let tx = TransactionRequest::default().from(sender).to(recipient).value(U256::from(1_000_000));
|
||||
|
||||
let tx_hash = provider.send_transaction(tx).await.expect("eth_sendTransaction failed");
|
||||
|
||||
// In dev/instant-mine mode the node seals a block for each transaction, so
|
||||
// the receipt becomes available almost immediately.
|
||||
let receipt = tx_hash.get_receipt().await.expect("failed to get receipt");
|
||||
|
||||
assert!(receipt.status(), "transaction should have succeeded");
|
||||
assert_eq!(receipt.to, Some(recipient));
|
||||
assert!(receipt.block_number.unwrap() > 0, "receipt should be in a mined block");
|
||||
|
||||
// Verify the transfer actually mutated state.
|
||||
let balance = provider.get_balance(recipient).await.expect("eth_getBalance failed");
|
||||
assert_eq!(balance, U256::from(1_000_000));
|
||||
}
|
||||
|
||||
const fn main() {}
|
||||
@@ -312,6 +312,11 @@ impl DeferredTrieData {
|
||||
/// Given that invariant, circular wait dependencies are impossible.
|
||||
#[instrument(level = "debug", target = "engine::tree::deferred_trie", skip_all)]
|
||||
pub fn wait_cloned(&self) -> ComputedTrieData {
|
||||
#[cfg(feature = "rayon")]
|
||||
debug_assert!(
|
||||
rayon::current_thread_index().is_none(),
|
||||
"wait_cloned must not be called from a rayon worker thread"
|
||||
);
|
||||
let mut state = self.state.lock();
|
||||
match &mut *state {
|
||||
// If the deferred trie data is ready, return the cached result.
|
||||
|
||||
@@ -1061,6 +1061,14 @@ mod tests {
|
||||
) -> ProviderResult<Option<StorageValue>> {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn storage_by_hashed_key(
|
||||
&self,
|
||||
_address: Address,
|
||||
_hashed_storage_key: StorageKey,
|
||||
) -> ProviderResult<Option<StorageValue>> {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
impl BytecodeReader for MockStateProvider {
|
||||
|
||||
@@ -223,6 +223,26 @@ impl<N: NodePrimitives> StateProvider for MemoryOverlayStateProviderRef<'_, N> {
|
||||
|
||||
self.historical.storage(address, storage_key)
|
||||
}
|
||||
|
||||
fn storage_by_hashed_key(
|
||||
&self,
|
||||
address: Address,
|
||||
hashed_storage_key: StorageKey,
|
||||
) -> ProviderResult<Option<StorageValue>> {
|
||||
let hashed_address = keccak256(address);
|
||||
let state = &self.trie_input().state;
|
||||
|
||||
if let Some(hs) = state.storages.get(&hashed_address) {
|
||||
if let Some(value) = hs.storage.get(&hashed_storage_key) {
|
||||
return Ok(Some(*value));
|
||||
}
|
||||
if hs.wiped {
|
||||
return Ok(Some(StorageValue::ZERO));
|
||||
}
|
||||
}
|
||||
|
||||
self.historical.storage_by_hashed_key(address, hashed_storage_key)
|
||||
}
|
||||
}
|
||||
|
||||
impl<N: NodePrimitives> BytecodeReader for MemoryOverlayStateProviderRef<'_, N> {
|
||||
|
||||
@@ -66,7 +66,8 @@ pub trait RethCli: Sized {
|
||||
F: FnOnce(Self, CliRunner) -> R,
|
||||
{
|
||||
let cli = Self::parse_args()?;
|
||||
let runner = CliRunner::try_default_runtime()?;
|
||||
let runner = CliRunner::try_default_runtime()
|
||||
.map_err(|e| Error::raw(clap::error::ErrorKind::Io, e))?;
|
||||
Ok(cli.with_runner(f, runner))
|
||||
}
|
||||
|
||||
|
||||
@@ -134,4 +134,4 @@ arbitrary = [
|
||||
]
|
||||
|
||||
rocksdb = ["reth-db-common/rocksdb", "reth-stages/rocksdb", "reth-provider/rocksdb", "reth-prune/rocksdb"]
|
||||
edge = ["rocksdb", "reth-db-common/edge", "reth-provider/edge"]
|
||||
edge = ["rocksdb"]
|
||||
|
||||
@@ -19,7 +19,7 @@ use reth_node_builder::{
|
||||
Node, NodeComponents, NodeComponentsBuilder, NodeTypes, NodeTypesWithDBAdapter,
|
||||
};
|
||||
use reth_node_core::{
|
||||
args::{DatabaseArgs, DatadirArgs, RocksDbArgs, StaticFilesArgs, StorageArgs},
|
||||
args::{DatabaseArgs, DatadirArgs, StaticFilesArgs, StorageArgs},
|
||||
dirs::{ChainPath, DataDirPath},
|
||||
};
|
||||
use reth_provider::{
|
||||
@@ -67,70 +67,35 @@ pub struct EnvironmentArgs<C: ChainSpecParser> {
|
||||
#[command(flatten)]
|
||||
pub static_files: StaticFilesArgs,
|
||||
|
||||
/// All `RocksDB` related arguments
|
||||
#[command(flatten)]
|
||||
pub rocksdb: RocksDbArgs,
|
||||
|
||||
/// Storage mode configuration (v2 vs v1/legacy)
|
||||
#[command(flatten)]
|
||||
pub storage: StorageArgs,
|
||||
}
|
||||
|
||||
impl<C: ChainSpecParser> EnvironmentArgs<C> {
|
||||
/// Returns the effective storage settings derived from `--storage.v2`, static-file, and
|
||||
/// `RocksDB` CLI args.
|
||||
/// Returns the effective storage settings derived from `--storage.v2`.
|
||||
///
|
||||
/// The base storage mode is determined by `--storage.v2`:
|
||||
/// - When `--storage.v2` is set: uses [`StorageSettings::v2()`] defaults
|
||||
/// - Otherwise: uses [`StorageSettings::v1()`] defaults
|
||||
///
|
||||
/// Individual `--static-files.*` and `--rocksdb.*` flags override the base when explicitly set.
|
||||
/// - Otherwise: uses [`StorageSettings::base()`] defaults
|
||||
pub fn storage_settings(&self) -> StorageSettings {
|
||||
let mut s = if self.storage.v2 { StorageSettings::v2() } else { StorageSettings::base() };
|
||||
|
||||
// Apply static files overrides (only when explicitly set)
|
||||
if let Some(v) = self.static_files.receipts {
|
||||
s = s.with_receipts_in_static_files(v);
|
||||
if self.storage.v2 {
|
||||
StorageSettings::v2()
|
||||
} else {
|
||||
StorageSettings::base()
|
||||
}
|
||||
if let Some(v) = self.static_files.transaction_senders {
|
||||
s = s.with_transaction_senders_in_static_files(v);
|
||||
}
|
||||
if let Some(v) = self.static_files.account_changesets {
|
||||
s = s.with_account_changesets_in_static_files(v);
|
||||
}
|
||||
if let Some(v) = self.static_files.storage_changesets {
|
||||
s = s.with_storage_changesets_in_static_files(v);
|
||||
}
|
||||
|
||||
// Apply rocksdb overrides
|
||||
// --rocksdb.all sets all rocksdb flags to true
|
||||
if self.rocksdb.all {
|
||||
s = s
|
||||
.with_transaction_hash_numbers_in_rocksdb(true)
|
||||
.with_storages_history_in_rocksdb(true)
|
||||
.with_account_history_in_rocksdb(true);
|
||||
}
|
||||
|
||||
// Individual rocksdb flags override --rocksdb.all when explicitly set
|
||||
if let Some(v) = self.rocksdb.tx_hash {
|
||||
s = s.with_transaction_hash_numbers_in_rocksdb(v);
|
||||
}
|
||||
if let Some(v) = self.rocksdb.storages_history {
|
||||
s = s.with_storages_history_in_rocksdb(v);
|
||||
}
|
||||
if let Some(v) = self.rocksdb.account_history {
|
||||
s = s.with_account_history_in_rocksdb(v);
|
||||
}
|
||||
|
||||
s
|
||||
}
|
||||
|
||||
/// Initializes environment according to [`AccessRights`] and returns an instance of
|
||||
/// [`Environment`].
|
||||
///
|
||||
/// Internally builds a [`reth_tasks::Runtime`] attached to the current tokio handle for
|
||||
/// parallel storage I/O.
|
||||
pub fn init<N: CliNodeTypes>(&self, access: AccessRights) -> eyre::Result<Environment<N>>
|
||||
where
|
||||
C: ChainSpecParser<ChainSpec = N::ChainSpec>,
|
||||
{
|
||||
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
|
||||
let data_dir = self.datadir.clone().resolve_datadir(self.chain.chain());
|
||||
let db_path = data_dir.db();
|
||||
let sf_path = data_dir.static_files();
|
||||
@@ -186,7 +151,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
|
||||
.build()?;
|
||||
|
||||
let provider_factory =
|
||||
self.create_provider_factory(&config, db, sfp, rocksdb_provider, access)?;
|
||||
self.create_provider_factory(&config, db, sfp, rocksdb_provider, access, runtime)?;
|
||||
if access.is_read_write() {
|
||||
debug!(target: "reth::cli", chain=%self.chain.chain(), genesis=?self.chain.genesis_hash(), "Initializing genesis");
|
||||
init_genesis_with_settings(&provider_factory, self.storage_settings())?;
|
||||
@@ -207,6 +172,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
|
||||
static_file_provider: StaticFileProvider<N::Primitives>,
|
||||
rocksdb_provider: RocksDBProvider,
|
||||
access: AccessRights,
|
||||
runtime: reth_tasks::Runtime,
|
||||
) -> eyre::Result<ProviderFactory<NodeTypesWithDBAdapter<N, DatabaseEnv>>>
|
||||
where
|
||||
C: ChainSpecParser<ChainSpec = N::ChainSpec>,
|
||||
@@ -217,6 +183,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
|
||||
self.chain.clone(),
|
||||
static_file_provider,
|
||||
rocksdb_provider,
|
||||
runtime,
|
||||
)?
|
||||
.with_prune_modes(prune_modes.clone());
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ use reth_codecs::Compact;
|
||||
use reth_db_api::{cursor::DbDupCursorRO, database::Database, tables, transaction::DbTx};
|
||||
use reth_db_common::DbTool;
|
||||
use reth_node_builder::NodeTypesWithDB;
|
||||
use reth_storage_api::StorageSettingsCache;
|
||||
use std::time::{Duration, Instant};
|
||||
use tracing::info;
|
||||
|
||||
@@ -16,26 +17,22 @@ const LOG_INTERVAL: Duration = Duration::from_secs(5);
|
||||
pub struct Command {
|
||||
/// The account address to check storage for
|
||||
address: Address,
|
||||
|
||||
/// Use hashed state tables (HashedStorages) instead of plain state
|
||||
#[arg(long)]
|
||||
hashed: bool,
|
||||
}
|
||||
|
||||
impl Command {
|
||||
/// Execute `db account-storage` command
|
||||
pub fn execute<N: NodeTypesWithDB>(self, tool: &DbTool<N>) -> eyre::Result<()> {
|
||||
let address = self.address;
|
||||
let use_hashed = self.hashed;
|
||||
let hashed_address = keccak256(address);
|
||||
let use_hashed_state = tool.provider_factory.cached_storage_settings().use_hashed_state();
|
||||
|
||||
let (slot_count, storage_size) = tool.provider_factory.db_ref().view(|tx| {
|
||||
let mut count = 0usize;
|
||||
let mut total_value_bytes = 0usize;
|
||||
let mut last_log = Instant::now();
|
||||
|
||||
if use_hashed {
|
||||
let (slot_count, storage_size) = if use_hashed_state {
|
||||
let hashed_address = keccak256(address);
|
||||
tool.provider_factory.db_ref().view(|tx| {
|
||||
let mut cursor = tx.cursor_dup_read::<tables::HashedStorages>()?;
|
||||
let mut count = 0usize;
|
||||
let mut total_value_bytes = 0usize;
|
||||
let mut last_log = Instant::now();
|
||||
|
||||
let walker = cursor.walk_dup(Some(hashed_address), None)?;
|
||||
for entry in walker {
|
||||
let (_, storage_entry) = entry?;
|
||||
@@ -47,7 +44,7 @@ impl Command {
|
||||
if last_log.elapsed() >= LOG_INTERVAL {
|
||||
info!(
|
||||
target: "reth::cli",
|
||||
hashed_address = %hashed_address,
|
||||
address = %address,
|
||||
slots = count,
|
||||
key = %storage_entry.key,
|
||||
"Processing hashed storage slots"
|
||||
@@ -55,16 +52,25 @@ impl Command {
|
||||
last_log = Instant::now();
|
||||
}
|
||||
}
|
||||
// HashedStorages uses 32-byte B256 key
|
||||
|
||||
let total_size = if count > 0 { 32 + total_value_bytes } else { 0 };
|
||||
|
||||
Ok::<_, eyre::Report>((count, total_size))
|
||||
} else {
|
||||
})??
|
||||
} else {
|
||||
tool.provider_factory.db_ref().view(|tx| {
|
||||
let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
|
||||
let mut count = 0usize;
|
||||
let mut total_value_bytes = 0usize;
|
||||
let mut last_log = Instant::now();
|
||||
|
||||
// Walk all storage entries for this address
|
||||
let walker = cursor.walk_dup(Some(address), None)?;
|
||||
for entry in walker {
|
||||
let (_, storage_entry) = entry?;
|
||||
count += 1;
|
||||
let mut buf = Vec::new();
|
||||
// StorageEntry encodes as: 32 bytes (key/subkey uncompressed) + compressed U256
|
||||
let entry_len = storage_entry.to_compact(&mut buf);
|
||||
total_value_bytes += entry_len;
|
||||
|
||||
@@ -79,24 +85,26 @@ impl Command {
|
||||
last_log = Instant::now();
|
||||
}
|
||||
}
|
||||
// PlainStorageState uses 20-byte Address key
|
||||
let total_size = if count > 0 { 20 + total_value_bytes } else { 0 };
|
||||
Ok::<_, eyre::Report>((count, total_size))
|
||||
}
|
||||
})??;
|
||||
|
||||
let state_source = if use_hashed { "hashed" } else { "plain" };
|
||||
// Add 20 bytes for the Address key (stored once per account in dupsort)
|
||||
let total_size = if count > 0 { 20 + total_value_bytes } else { 0 };
|
||||
|
||||
Ok::<_, eyre::Report>((count, total_size))
|
||||
})??
|
||||
};
|
||||
|
||||
let hashed_address = keccak256(address);
|
||||
|
||||
println!("Account: {address}");
|
||||
println!("Hashed address: {hashed_address}");
|
||||
println!("State source: {state_source}");
|
||||
println!("Storage slots: {slot_count}");
|
||||
println!("Storage size: {} (estimated)", human_bytes(storage_size as f64));
|
||||
|
||||
if !use_hashed {
|
||||
// When querying plain state, also estimate what hashed would be
|
||||
if use_hashed_state {
|
||||
println!("Hashed storage size: {} (estimated)", human_bytes(storage_size as f64));
|
||||
} else {
|
||||
// Estimate hashed storage size: 32-byte B256 key instead of 20-byte Address
|
||||
let hashed_size_estimate = if slot_count > 0 { storage_size + 12 } else { 0 };
|
||||
let total_estimate = storage_size + hashed_size_estimate;
|
||||
println!("Plain storage size: {} (estimated)", human_bytes(storage_size as f64));
|
||||
println!(
|
||||
"Hashed storage size: {} (estimated)",
|
||||
human_bytes(hashed_size_estimate as f64)
|
||||
@@ -123,17 +131,5 @@ mod tests {
|
||||
cmd.address,
|
||||
"0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045".parse::<Address>().unwrap()
|
||||
);
|
||||
assert!(!cmd.hashed);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_hashed_flag() {
|
||||
let cmd = Command::try_parse_from([
|
||||
"account-storage",
|
||||
"0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045",
|
||||
"--hashed",
|
||||
])
|
||||
.unwrap();
|
||||
assert!(cmd.hashed);
|
||||
}
|
||||
}
|
||||
|
||||
61
crates/cli/commands/src/db/copy.rs
Normal file
61
crates/cli/commands/src/db/copy.rs
Normal file
@@ -0,0 +1,61 @@
|
||||
use clap::Parser;
|
||||
use reth_db::mdbx::{self, ffi};
|
||||
use std::path::PathBuf;
|
||||
|
||||
/// Copies the MDBX database to a new location.
|
||||
///
|
||||
/// Equivalent to the standalone `mdbx_copy` tool but bundled into reth.
|
||||
#[derive(Parser, Debug)]
|
||||
pub struct Command {
|
||||
/// Destination path for the database copy.
|
||||
dest: PathBuf,
|
||||
|
||||
/// Compact the database while copying (reclaims free space).
|
||||
#[arg(short, long)]
|
||||
compact: bool,
|
||||
|
||||
/// Force dynamic size for the destination database.
|
||||
#[arg(short = 'd', long)]
|
||||
force_dynamic_size: bool,
|
||||
|
||||
/// Throttle to avoid MVCC pressure on writers.
|
||||
#[arg(short = 'p', long)]
|
||||
throttle_mvcc: bool,
|
||||
}
|
||||
|
||||
impl Command {
|
||||
/// Execute `db copy` command
|
||||
pub fn execute(self, db: &mdbx::DatabaseEnv) -> eyre::Result<()> {
|
||||
let mut flags: ffi::MDBX_copy_flags_t = ffi::MDBX_CP_DEFAULTS;
|
||||
if self.compact {
|
||||
flags |= ffi::MDBX_CP_COMPACT;
|
||||
}
|
||||
if self.force_dynamic_size {
|
||||
flags |= ffi::MDBX_CP_FORCE_DYNAMIC_SIZE;
|
||||
}
|
||||
if self.throttle_mvcc {
|
||||
flags |= ffi::MDBX_CP_THROTTLE_MVCC;
|
||||
}
|
||||
|
||||
let dest = self
|
||||
.dest
|
||||
.to_str()
|
||||
.ok_or_else(|| eyre::eyre!("destination path must be valid UTF-8"))?;
|
||||
let dest_cstr = std::ffi::CString::new(dest)?;
|
||||
|
||||
println!("Copying database to {} ...", self.dest.display());
|
||||
|
||||
let rc = db.with_raw_env_ptr(|env_ptr| unsafe {
|
||||
ffi::mdbx_env_copy(env_ptr, dest_cstr.as_ptr(), flags)
|
||||
});
|
||||
|
||||
if rc != 0 {
|
||||
eyre::bail!("mdbx_env_copy failed with error code {rc}: {}", unsafe {
|
||||
std::ffi::CStr::from_ptr(ffi::mdbx_strerror(rc)).to_string_lossy()
|
||||
});
|
||||
}
|
||||
|
||||
println!("Done.");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -98,7 +98,8 @@ impl Command {
|
||||
)?;
|
||||
|
||||
if let Some(entry) = entry {
|
||||
println!("{}", serde_json::to_string_pretty(&entry)?);
|
||||
let se: reth_primitives_traits::StorageEntry = entry.into();
|
||||
println!("{}", serde_json::to_string_pretty(&se)?);
|
||||
} else {
|
||||
error!(target: "reth::cli", "No content for the given table key.");
|
||||
}
|
||||
@@ -106,7 +107,14 @@ impl Command {
|
||||
}
|
||||
|
||||
let changesets = provider.storage_changeset(key.block_number())?;
|
||||
println!("{}", serde_json::to_string_pretty(&changesets)?);
|
||||
let serializable: Vec<_> = changesets
|
||||
.into_iter()
|
||||
.map(|(addr, entry)| {
|
||||
let se: reth_primitives_traits::StorageEntry = entry.into();
|
||||
(addr, se)
|
||||
})
|
||||
.collect();
|
||||
println!("{}", serde_json::to_string_pretty(&serializable)?);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ use std::{
|
||||
mod account_storage;
|
||||
mod checksum;
|
||||
mod clear;
|
||||
mod copy;
|
||||
mod diff;
|
||||
mod get;
|
||||
mod list;
|
||||
@@ -42,6 +43,8 @@ pub enum Subcommands {
|
||||
List(list::Command),
|
||||
/// Calculates the content checksum of a table or static file segment
|
||||
Checksum(checksum::Command),
|
||||
/// Copies the MDBX database to a new location (bundled mdbx_copy)
|
||||
Copy(copy::Command),
|
||||
/// Create a diff between two database tables or two entire databases.
|
||||
Diff(diff::Command),
|
||||
/// Gets the content of a table for the given key
|
||||
@@ -70,23 +73,23 @@ pub enum Subcommands {
|
||||
State(state::Command),
|
||||
}
|
||||
|
||||
/// Initializes a provider factory with specified access rights, and then execute with the provided
|
||||
/// command
|
||||
macro_rules! db_exec {
|
||||
($env:expr, $tool:ident, $N:ident, $access_rights:expr, $command:block) => {
|
||||
let Environment { provider_factory, .. } = $env.init::<$N>($access_rights)?;
|
||||
|
||||
let $tool = DbTool::new(provider_factory)?;
|
||||
$command;
|
||||
};
|
||||
}
|
||||
|
||||
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C> {
|
||||
/// Execute `db` command
|
||||
pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(
|
||||
self,
|
||||
ctx: CliContext,
|
||||
) -> eyre::Result<()> {
|
||||
/// Initializes a provider factory with specified access rights, and then executes the
|
||||
/// provided command.
|
||||
macro_rules! db_exec {
|
||||
($env:expr, $tool:ident, $N:ident, $access_rights:expr, $command:block) => {
|
||||
let Environment { provider_factory, .. } = $env.init::<$N>($access_rights)?;
|
||||
|
||||
let $tool = DbTool::new(provider_factory)?;
|
||||
$command;
|
||||
};
|
||||
}
|
||||
|
||||
let data_dir = self.env.datadir.clone().resolve_datadir(self.env.chain.chain());
|
||||
let db_path = data_dir.db();
|
||||
let static_files_path = data_dir.static_files();
|
||||
@@ -124,6 +127,11 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
|
||||
command.execute(&tool)?;
|
||||
});
|
||||
}
|
||||
Subcommands::Copy(command) => {
|
||||
db_exec!(self.env, tool, N, AccessRights::RO, {
|
||||
command.execute(tool.provider_factory.db_ref())?;
|
||||
});
|
||||
}
|
||||
Subcommands::Diff(command) => {
|
||||
db_exec!(self.env, tool, N, AccessRights::RO, {
|
||||
command.execute(&tool)?;
|
||||
|
||||
@@ -64,7 +64,7 @@ impl Command {
|
||||
let executor = task_executor.clone();
|
||||
let pprof_dump_dir = data_dir.pprof_dumps();
|
||||
|
||||
let handle = task_executor.spawn_critical("metrics server", async move {
|
||||
let handle = task_executor.spawn_critical_task("metrics server", async move {
|
||||
let config = MetricServerConfig::new(
|
||||
listen_addr,
|
||||
VersionInfo {
|
||||
|
||||
@@ -39,38 +39,12 @@ enum Subcommands {
|
||||
#[derive(Debug, Clone, Copy, Subcommand)]
|
||||
#[clap(rename_all = "snake_case")]
|
||||
pub enum SetCommand {
|
||||
/// Store receipts in static files instead of the database
|
||||
Receipts {
|
||||
#[clap(action(ArgAction::Set))]
|
||||
value: bool,
|
||||
},
|
||||
/// Store transaction senders in static files instead of the database
|
||||
TransactionSenders {
|
||||
#[clap(action(ArgAction::Set))]
|
||||
value: bool,
|
||||
},
|
||||
/// Store account changesets in static files instead of the database
|
||||
AccountChangesets {
|
||||
#[clap(action(ArgAction::Set))]
|
||||
value: bool,
|
||||
},
|
||||
/// Store storage history in rocksdb instead of MDBX
|
||||
StoragesHistory {
|
||||
#[clap(action(ArgAction::Set))]
|
||||
value: bool,
|
||||
},
|
||||
/// Store transaction hash to number mapping in rocksdb instead of MDBX
|
||||
TransactionHashNumbers {
|
||||
#[clap(action(ArgAction::Set))]
|
||||
value: bool,
|
||||
},
|
||||
/// Store account history in rocksdb instead of MDBX
|
||||
AccountHistory {
|
||||
#[clap(action(ArgAction::Set))]
|
||||
value: bool,
|
||||
},
|
||||
/// Store storage changesets in static files instead of the database
|
||||
StorageChangesets {
|
||||
/// Enable or disable v2 storage layout
|
||||
///
|
||||
/// When enabled, uses static files for receipts/senders/changesets and RocksDB for
|
||||
/// history indices and transaction hashes. When disabled, uses v1/legacy layout (everything in
|
||||
/// MDBX).
|
||||
V2 {
|
||||
#[clap(action(ArgAction::Set))]
|
||||
value: bool,
|
||||
},
|
||||
@@ -113,74 +87,18 @@ impl Command {
|
||||
println!("No storage settings found, creating new settings.");
|
||||
}
|
||||
|
||||
let mut settings @ StorageSettings {
|
||||
receipts_in_static_files: _,
|
||||
transaction_senders_in_static_files: _,
|
||||
storages_history_in_rocksdb: _,
|
||||
transaction_hash_numbers_in_rocksdb: _,
|
||||
account_history_in_rocksdb: _,
|
||||
account_changesets_in_static_files: _,
|
||||
storage_changesets_in_static_files: _,
|
||||
use_hashed_state: _,
|
||||
} = settings.unwrap_or_else(StorageSettings::v1);
|
||||
let mut settings @ StorageSettings { storage_v2: _ } =
|
||||
settings.unwrap_or_else(StorageSettings::v1);
|
||||
|
||||
// Update the setting based on the key
|
||||
match cmd {
|
||||
SetCommand::Receipts { value } => {
|
||||
if settings.receipts_in_static_files == value {
|
||||
println!("receipts_in_static_files is already set to {}", value);
|
||||
SetCommand::V2 { value } => {
|
||||
if settings.storage_v2 == value {
|
||||
println!("storage_v2 is already set to {}", value);
|
||||
return Ok(());
|
||||
}
|
||||
settings.receipts_in_static_files = value;
|
||||
println!("Set receipts_in_static_files = {}", value);
|
||||
}
|
||||
SetCommand::TransactionSenders { value } => {
|
||||
if settings.transaction_senders_in_static_files == value {
|
||||
println!("transaction_senders_in_static_files is already set to {}", value);
|
||||
return Ok(());
|
||||
}
|
||||
settings.transaction_senders_in_static_files = value;
|
||||
println!("Set transaction_senders_in_static_files = {}", value);
|
||||
}
|
||||
SetCommand::AccountChangesets { value } => {
|
||||
if settings.account_changesets_in_static_files == value {
|
||||
println!("account_changesets_in_static_files is already set to {}", value);
|
||||
return Ok(());
|
||||
}
|
||||
settings.account_changesets_in_static_files = value;
|
||||
println!("Set account_changesets_in_static_files = {}", value);
|
||||
}
|
||||
SetCommand::StoragesHistory { value } => {
|
||||
if settings.storages_history_in_rocksdb == value {
|
||||
println!("storages_history_in_rocksdb is already set to {}", value);
|
||||
return Ok(());
|
||||
}
|
||||
settings.storages_history_in_rocksdb = value;
|
||||
println!("Set storages_history_in_rocksdb = {}", value);
|
||||
}
|
||||
SetCommand::TransactionHashNumbers { value } => {
|
||||
if settings.transaction_hash_numbers_in_rocksdb == value {
|
||||
println!("transaction_hash_numbers_in_rocksdb is already set to {}", value);
|
||||
return Ok(());
|
||||
}
|
||||
settings.transaction_hash_numbers_in_rocksdb = value;
|
||||
println!("Set transaction_hash_numbers_in_rocksdb = {}", value);
|
||||
}
|
||||
SetCommand::AccountHistory { value } => {
|
||||
if settings.account_history_in_rocksdb == value {
|
||||
println!("account_history_in_rocksdb is already set to {}", value);
|
||||
return Ok(());
|
||||
}
|
||||
settings.account_history_in_rocksdb = value;
|
||||
println!("Set account_history_in_rocksdb = {}", value);
|
||||
}
|
||||
SetCommand::StorageChangesets { value } => {
|
||||
if settings.storage_changesets_in_static_files == value {
|
||||
println!("storage_changesets_in_static_files is already set to {}", value);
|
||||
return Ok(());
|
||||
}
|
||||
settings.storage_changesets_in_static_files = value;
|
||||
println!("Set storage_changesets_in_static_files = {}", value);
|
||||
settings.storage_v2 = value;
|
||||
println!("Set storage_v2 = {}", value);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -39,10 +39,6 @@ pub struct Command {
|
||||
/// Output format (table, json, csv)
|
||||
#[arg(long, short, default_value = "table")]
|
||||
format: OutputFormat,
|
||||
|
||||
/// Use hashed state tables (HashedStorages) instead of plain state
|
||||
#[arg(long)]
|
||||
hashed: bool,
|
||||
}
|
||||
|
||||
impl Command {
|
||||
@@ -67,57 +63,24 @@ impl Command {
|
||||
address: Address,
|
||||
limit: usize,
|
||||
) -> eyre::Result<()> {
|
||||
let use_hashed = self.hashed;
|
||||
let hashed_address = keccak256(address);
|
||||
let use_hashed_state = tool.provider_factory.cached_storage_settings().use_hashed_state();
|
||||
|
||||
let entries = tool.provider_factory.db_ref().view(|tx| {
|
||||
let account = if use_hashed {
|
||||
tx.get::<tables::HashedAccounts>(hashed_address)?
|
||||
} else {
|
||||
tx.get::<tables::PlainAccountState>(address)?
|
||||
};
|
||||
|
||||
let mut entries = Vec::new();
|
||||
let mut last_log = Instant::now();
|
||||
|
||||
if use_hashed {
|
||||
let (account, walker_entries) = if use_hashed_state {
|
||||
let hashed_address = keccak256(address);
|
||||
let account = tx.get::<tables::HashedAccounts>(hashed_address)?;
|
||||
let mut cursor = tx.cursor_dup_read::<tables::HashedStorages>()?;
|
||||
let walker = cursor.walk_dup(Some(hashed_address), None)?;
|
||||
let mut entries = Vec::new();
|
||||
let mut last_log = Instant::now();
|
||||
for (idx, entry) in walker.enumerate() {
|
||||
let (_, storage_entry) = entry?;
|
||||
|
||||
if storage_entry.value != U256::ZERO {
|
||||
entries.push((storage_entry.key, storage_entry.value));
|
||||
}
|
||||
|
||||
if entries.len() >= limit {
|
||||
break;
|
||||
}
|
||||
|
||||
if last_log.elapsed() >= LOG_INTERVAL {
|
||||
info!(
|
||||
target: "reth::cli",
|
||||
hashed_address = %hashed_address,
|
||||
slots_scanned = idx,
|
||||
"Scanning hashed storage slots"
|
||||
);
|
||||
last_log = Instant::now();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
|
||||
let walker = cursor.walk_dup(Some(address), None)?;
|
||||
for (idx, entry) in walker.enumerate() {
|
||||
let (_, storage_entry) = entry?;
|
||||
|
||||
if storage_entry.value != U256::ZERO {
|
||||
entries.push((storage_entry.key, storage_entry.value));
|
||||
}
|
||||
|
||||
if entries.len() >= limit {
|
||||
break;
|
||||
}
|
||||
|
||||
if last_log.elapsed() >= LOG_INTERVAL {
|
||||
info!(
|
||||
target: "reth::cli",
|
||||
@@ -128,14 +91,42 @@ impl Command {
|
||||
last_log = Instant::now();
|
||||
}
|
||||
}
|
||||
}
|
||||
(account, entries)
|
||||
} else {
|
||||
// Get account info
|
||||
let account = tx.get::<tables::PlainAccountState>(address)?;
|
||||
// Get storage entries
|
||||
let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
|
||||
let walker = cursor.walk_dup(Some(address), None)?;
|
||||
let mut entries = Vec::new();
|
||||
let mut last_log = Instant::now();
|
||||
for (idx, entry) in walker.enumerate() {
|
||||
let (_, storage_entry) = entry?;
|
||||
if storage_entry.value != U256::ZERO {
|
||||
entries.push((storage_entry.key, storage_entry.value));
|
||||
}
|
||||
if entries.len() >= limit {
|
||||
break;
|
||||
}
|
||||
if last_log.elapsed() >= LOG_INTERVAL {
|
||||
info!(
|
||||
target: "reth::cli",
|
||||
address = %address,
|
||||
slots_scanned = idx,
|
||||
"Scanning storage slots"
|
||||
);
|
||||
last_log = Instant::now();
|
||||
}
|
||||
}
|
||||
(account, entries)
|
||||
};
|
||||
|
||||
Ok::<_, eyre::Report>((account, entries))
|
||||
Ok::<_, eyre::Report>((account, walker_entries))
|
||||
})??;
|
||||
|
||||
let (account, storage_entries) = entries;
|
||||
|
||||
self.print_results(address, None, account, &storage_entries, use_hashed);
|
||||
self.print_results(address, None, account, &storage_entries);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -154,7 +145,7 @@ impl Command {
|
||||
|
||||
// Check storage settings to determine where history is stored
|
||||
let storage_settings = tool.provider_factory.cached_storage_settings();
|
||||
let history_in_rocksdb = storage_settings.storages_history_in_rocksdb;
|
||||
let history_in_rocksdb = storage_settings.storage_v2;
|
||||
|
||||
// For historical queries, enumerate keys from history indices only
|
||||
// (not PlainStorageState, which reflects current state)
|
||||
@@ -211,7 +202,7 @@ impl Command {
|
||||
}
|
||||
}
|
||||
|
||||
self.print_results(address, Some(block), account, &entries, false);
|
||||
self.print_results(address, Some(block), account, &entries);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -353,22 +344,15 @@ impl Command {
|
||||
block: Option<BlockNumber>,
|
||||
account: Option<reth_primitives_traits::Account>,
|
||||
storage: &[(alloy_primitives::B256, U256)],
|
||||
use_hashed: bool,
|
||||
) {
|
||||
let state_source = if use_hashed { "hashed" } else { "plain" };
|
||||
|
||||
match self.format {
|
||||
OutputFormat::Table => {
|
||||
println!("Account: {address}");
|
||||
if use_hashed {
|
||||
println!("Hashed address: {}", keccak256(address));
|
||||
}
|
||||
if let Some(b) = block {
|
||||
println!("Block: {b}");
|
||||
} else {
|
||||
println!("Block: latest");
|
||||
}
|
||||
println!("State source: {state_source}");
|
||||
println!();
|
||||
|
||||
if let Some(acc) = account {
|
||||
@@ -382,10 +366,9 @@ impl Command {
|
||||
}
|
||||
|
||||
println!();
|
||||
let slot_header = if use_hashed { "Hashed Slot" } else { "Slot" };
|
||||
println!("Storage ({} slots):", storage.len());
|
||||
println!("{:-<130}", "");
|
||||
println!("{:<66} | {:<64}", slot_header, "Value");
|
||||
println!("{:<66} | {:<64}", "Slot", "Value");
|
||||
println!("{:-<130}", "");
|
||||
for (key, value) in storage {
|
||||
println!("{key} | {value:#066x}");
|
||||
@@ -394,9 +377,7 @@ impl Command {
|
||||
OutputFormat::Json => {
|
||||
let output = serde_json::json!({
|
||||
"address": address.to_string(),
|
||||
"hashed_address": if use_hashed { Some(keccak256(address).to_string()) } else { None },
|
||||
"block": block,
|
||||
"state_source": state_source,
|
||||
"account": account.map(|a| serde_json::json!({
|
||||
"nonce": a.nonce,
|
||||
"balance": a.balance.to_string(),
|
||||
@@ -454,17 +435,5 @@ mod tests {
|
||||
let cmd = Command::try_parse_from(["state", "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045"])
|
||||
.unwrap();
|
||||
assert_eq!(cmd.block, None);
|
||||
assert!(!cmd.hashed);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_state_args_hashed() {
|
||||
let cmd = Command::try_parse_from([
|
||||
"state",
|
||||
"0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045",
|
||||
"--hashed",
|
||||
])
|
||||
.unwrap();
|
||||
assert!(cmd.hashed);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,6 +37,14 @@ pub struct DownloadDefaults {
|
||||
pub available_snapshots: Vec<Cow<'static, str>>,
|
||||
/// Default base URL for snapshots
|
||||
pub default_base_url: Cow<'static, str>,
|
||||
/// Default base URL for chain-aware snapshots.
|
||||
///
|
||||
/// When set, the chain ID is appended to form the full URL: `{base_url}/{chain_id}`.
|
||||
/// For example, given a base URL of `https://snapshots.example.com` and chain ID `1`,
|
||||
/// the resulting URL would be `https://snapshots.example.com/1`.
|
||||
///
|
||||
/// Falls back to [`default_base_url`](Self::default_base_url) when `None`.
|
||||
pub default_chain_aware_base_url: Option<Cow<'static, str>>,
|
||||
/// Optional custom long help text that overrides the generated help
|
||||
pub long_help: Option<String>,
|
||||
}
|
||||
@@ -60,6 +68,7 @@ impl DownloadDefaults {
|
||||
Cow::Borrowed("https://publicnode.com/snapshots (full nodes & testnets)"),
|
||||
],
|
||||
default_base_url: Cow::Borrowed(MERKLE_BASE_URL),
|
||||
default_chain_aware_base_url: None,
|
||||
long_help: None,
|
||||
}
|
||||
}
|
||||
@@ -84,9 +93,11 @@ impl DownloadDefaults {
|
||||
}
|
||||
|
||||
help.push_str(
|
||||
"\nIf no URL is provided, the latest mainnet archive snapshot\nwill be proposed for download from ",
|
||||
"\nIf no URL is provided, the latest archive snapshot for the selected chain\nwill be proposed for download from ",
|
||||
);
|
||||
help.push_str(
|
||||
self.default_chain_aware_base_url.as_deref().unwrap_or(&self.default_base_url),
|
||||
);
|
||||
help.push_str(self.default_base_url.as_ref());
|
||||
help.push_str(
|
||||
".\n\nLocal file:// URLs are also supported for extracting snapshots from disk.",
|
||||
);
|
||||
@@ -111,6 +122,12 @@ impl DownloadDefaults {
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the default chain-aware base URL.
|
||||
pub fn with_chain_aware_base_url(mut self, url: impl Into<Cow<'static, str>>) -> Self {
|
||||
self.default_chain_aware_base_url = Some(url.into());
|
||||
self
|
||||
}
|
||||
|
||||
/// Builder: Set custom long help text, overriding the generated help
|
||||
pub fn with_long_help(mut self, help: impl Into<String>) -> Self {
|
||||
self.long_help = Some(help.into());
|
||||
@@ -142,7 +159,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
|
||||
let url = match self.url {
|
||||
Some(url) => url,
|
||||
None => {
|
||||
let url = get_latest_snapshot_url().await?;
|
||||
let url = get_latest_snapshot_url(self.env.chain.chain().id()).await?;
|
||||
info!(target: "reth::cli", "Using default snapshot URL: {}", url);
|
||||
url
|
||||
}
|
||||
@@ -509,8 +526,12 @@ async fn stream_and_extract(url: &str, target_dir: &Path) -> Result<()> {
|
||||
}
|
||||
|
||||
// Builds default URL for latest mainnet archive snapshot using configured defaults
|
||||
async fn get_latest_snapshot_url() -> Result<String> {
|
||||
let base_url = &DownloadDefaults::get_global().default_base_url;
|
||||
async fn get_latest_snapshot_url(chain_id: u64) -> Result<String> {
|
||||
let defaults = DownloadDefaults::get_global();
|
||||
let base_url = match &defaults.default_chain_aware_base_url {
|
||||
Some(url) => format!("{url}/{chain_id}"),
|
||||
None => defaults.default_base_url.to_string(),
|
||||
};
|
||||
let latest_url = format!("{base_url}/latest.txt");
|
||||
let filename = Client::new()
|
||||
.get(latest_url)
|
||||
|
||||
@@ -10,8 +10,8 @@ use reth_node_builder::NodeBuilder;
|
||||
use reth_node_core::{
|
||||
args::{
|
||||
DatabaseArgs, DatadirArgs, DebugArgs, DevArgs, EngineArgs, EraArgs, MetricArgs,
|
||||
NetworkArgs, PayloadBuilderArgs, PruningArgs, RocksDbArgs, RpcServerArgs, StaticFilesArgs,
|
||||
StorageArgs, TxPoolArgs,
|
||||
NetworkArgs, PayloadBuilderArgs, PruningArgs, RpcServerArgs, StaticFilesArgs, StorageArgs,
|
||||
TxPoolArgs,
|
||||
},
|
||||
node_config::NodeConfig,
|
||||
version,
|
||||
@@ -103,10 +103,6 @@ pub struct NodeCommand<C: ChainSpecParser, Ext: clap::Args + fmt::Debug = NoArgs
|
||||
#[command(flatten)]
|
||||
pub pruning: PruningArgs,
|
||||
|
||||
/// All `RocksDB` table routing arguments
|
||||
#[command(flatten)]
|
||||
pub rocksdb: RocksDbArgs,
|
||||
|
||||
/// Engine cli arguments
|
||||
#[command(flatten, next_help_heading = "Engine")]
|
||||
pub engine: EngineArgs,
|
||||
@@ -119,8 +115,8 @@ pub struct NodeCommand<C: ChainSpecParser, Ext: clap::Args + fmt::Debug = NoArgs
|
||||
#[command(flatten, next_help_heading = "Static Files")]
|
||||
pub static_files: StaticFilesArgs,
|
||||
|
||||
/// Storage mode configuration (v2 vs v1/legacy)
|
||||
#[command(flatten)]
|
||||
/// All storage related arguments with --storage prefix
|
||||
#[command(flatten, next_help_heading = "Storage")]
|
||||
pub storage: StorageArgs,
|
||||
|
||||
/// Additional cli arguments
|
||||
@@ -175,7 +171,6 @@ where
|
||||
db,
|
||||
dev,
|
||||
pruning,
|
||||
rocksdb,
|
||||
engine,
|
||||
era,
|
||||
static_files,
|
||||
@@ -183,9 +178,6 @@ where
|
||||
ext,
|
||||
} = self;
|
||||
|
||||
// Validate RocksDB arguments
|
||||
rocksdb.validate()?;
|
||||
|
||||
// set up node config
|
||||
let mut node_config = NodeConfig {
|
||||
datadir,
|
||||
@@ -201,7 +193,6 @@ where
|
||||
db,
|
||||
dev,
|
||||
pruning,
|
||||
rocksdb,
|
||||
engine,
|
||||
era,
|
||||
static_files,
|
||||
|
||||
@@ -76,7 +76,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> PruneComma
|
||||
// Set up cancellation token for graceful shutdown on Ctrl+C
|
||||
let cancellation = CancellationToken::new();
|
||||
let cancellation_clone = cancellation.clone();
|
||||
ctx.task_executor.spawn_critical("prune-ctrl-c", async move {
|
||||
ctx.task_executor.spawn_critical_task("prune-ctrl-c", async move {
|
||||
tokio::signal::ctrl_c().await.expect("failed to listen for ctrl-c");
|
||||
cancellation_clone.cancel();
|
||||
});
|
||||
|
||||
@@ -9,7 +9,10 @@ use reth_db_api::{
|
||||
transaction::{DbTx, DbTxMut},
|
||||
};
|
||||
use reth_db_common::{
|
||||
init::{insert_genesis_header, insert_genesis_history, insert_genesis_state},
|
||||
init::{
|
||||
insert_genesis_account_history, insert_genesis_header, insert_genesis_state,
|
||||
insert_genesis_storage_history,
|
||||
},
|
||||
DbTool,
|
||||
};
|
||||
use reth_node_api::{HeaderTy, ReceiptTy, TxTy};
|
||||
@@ -42,12 +45,16 @@ impl<C: ChainSpecParser> Command<C> {
|
||||
|
||||
let tool = DbTool::new(provider_factory)?;
|
||||
|
||||
let static_file_segment = match self.stage {
|
||||
StageEnum::Headers => Some(StaticFileSegment::Headers),
|
||||
StageEnum::Bodies => Some(StaticFileSegment::Transactions),
|
||||
StageEnum::Execution => Some(StaticFileSegment::Receipts),
|
||||
StageEnum::Senders => Some(StaticFileSegment::TransactionSenders),
|
||||
_ => None,
|
||||
let static_file_segments = match self.stage {
|
||||
StageEnum::Headers => vec![StaticFileSegment::Headers],
|
||||
StageEnum::Bodies => vec![StaticFileSegment::Transactions],
|
||||
StageEnum::Execution => vec![
|
||||
StaticFileSegment::Receipts,
|
||||
StaticFileSegment::AccountChangeSets,
|
||||
StaticFileSegment::StorageChangeSets,
|
||||
],
|
||||
StageEnum::Senders => vec![StaticFileSegment::TransactionSenders],
|
||||
_ => vec![],
|
||||
};
|
||||
|
||||
// Calling `StaticFileProviderRW::prune_*` will instruct the writer to prune rows only
|
||||
@@ -55,35 +62,33 @@ impl<C: ChainSpecParser> Command<C> {
|
||||
// deleting the jar files, otherwise if the task were to be interrupted after we
|
||||
// have deleted them, BUT before we have committed the checkpoints to the database, we'd
|
||||
// lose essential data.
|
||||
if let Some(static_file_segment) = static_file_segment {
|
||||
let static_file_provider = tool.provider_factory.static_file_provider();
|
||||
if let Some(highest_block) =
|
||||
static_file_provider.get_highest_static_file_block(static_file_segment)
|
||||
let static_file_provider = tool.provider_factory.static_file_provider();
|
||||
for segment in static_file_segments {
|
||||
if let Some(highest_block) = static_file_provider.get_highest_static_file_block(segment)
|
||||
{
|
||||
let mut writer = static_file_provider.latest_writer(static_file_segment)?;
|
||||
let mut writer = static_file_provider.latest_writer(segment)?;
|
||||
|
||||
match static_file_segment {
|
||||
match segment {
|
||||
StaticFileSegment::Headers => {
|
||||
// Prune all headers leaving genesis intact.
|
||||
writer.prune_headers(highest_block)?;
|
||||
}
|
||||
StaticFileSegment::Transactions => {
|
||||
let to_delete = static_file_provider
|
||||
.get_highest_static_file_tx(static_file_segment)
|
||||
.get_highest_static_file_tx(segment)
|
||||
.map(|tx_num| tx_num + 1)
|
||||
.unwrap_or_default();
|
||||
writer.prune_transactions(to_delete, 0)?;
|
||||
}
|
||||
StaticFileSegment::Receipts => {
|
||||
let to_delete = static_file_provider
|
||||
.get_highest_static_file_tx(static_file_segment)
|
||||
.get_highest_static_file_tx(segment)
|
||||
.map(|tx_num| tx_num + 1)
|
||||
.unwrap_or_default();
|
||||
writer.prune_receipts(to_delete, 0)?;
|
||||
}
|
||||
StaticFileSegment::TransactionSenders => {
|
||||
let to_delete = static_file_provider
|
||||
.get_highest_static_file_tx(static_file_segment)
|
||||
.get_highest_static_file_tx(segment)
|
||||
.map(|tx_num| tx_num + 1)
|
||||
.unwrap_or_default();
|
||||
writer.prune_transaction_senders(to_delete, 0)?;
|
||||
@@ -128,8 +133,15 @@ impl<C: ChainSpecParser> Command<C> {
|
||||
reset_stage_checkpoint(tx, StageId::SenderRecovery)?;
|
||||
}
|
||||
StageEnum::Execution => {
|
||||
tx.clear::<tables::PlainAccountState>()?;
|
||||
tx.clear::<tables::PlainStorageState>()?;
|
||||
if provider_rw.cached_storage_settings().use_hashed_state() {
|
||||
tx.clear::<tables::HashedAccounts>()?;
|
||||
tx.clear::<tables::HashedStorages>()?;
|
||||
reset_stage_checkpoint(tx, StageId::AccountHashing)?;
|
||||
reset_stage_checkpoint(tx, StageId::StorageHashing)?;
|
||||
} else {
|
||||
tx.clear::<tables::PlainAccountState>()?;
|
||||
tx.clear::<tables::PlainStorageState>()?;
|
||||
}
|
||||
tx.clear::<tables::AccountChangeSets>()?;
|
||||
tx.clear::<tables::StorageChangeSets>()?;
|
||||
tx.clear::<tables::Bytecodes>()?;
|
||||
@@ -171,29 +183,42 @@ impl<C: ChainSpecParser> Command<C> {
|
||||
None,
|
||||
)?;
|
||||
}
|
||||
StageEnum::AccountHistory | StageEnum::StorageHistory => {
|
||||
StageEnum::AccountHistory => {
|
||||
let settings = provider_rw.cached_storage_settings();
|
||||
let rocksdb = tool.provider_factory.rocksdb_provider();
|
||||
|
||||
if settings.account_history_in_rocksdb {
|
||||
if settings.storage_v2 {
|
||||
rocksdb.clear::<tables::AccountsHistory>()?;
|
||||
} else {
|
||||
tx.clear::<tables::AccountsHistory>()?;
|
||||
}
|
||||
|
||||
if settings.storages_history_in_rocksdb {
|
||||
reset_stage_checkpoint(tx, StageId::IndexAccountHistory)?;
|
||||
|
||||
insert_genesis_account_history(
|
||||
&provider_rw,
|
||||
self.env.chain.genesis().alloc.iter(),
|
||||
)?;
|
||||
}
|
||||
StageEnum::StorageHistory => {
|
||||
let settings = provider_rw.cached_storage_settings();
|
||||
let rocksdb = tool.provider_factory.rocksdb_provider();
|
||||
|
||||
if settings.storage_v2 {
|
||||
rocksdb.clear::<tables::StoragesHistory>()?;
|
||||
} else {
|
||||
tx.clear::<tables::StoragesHistory>()?;
|
||||
}
|
||||
|
||||
reset_stage_checkpoint(tx, StageId::IndexAccountHistory)?;
|
||||
reset_stage_checkpoint(tx, StageId::IndexStorageHistory)?;
|
||||
|
||||
insert_genesis_history(&provider_rw, self.env.chain.genesis().alloc.iter())?;
|
||||
insert_genesis_storage_history(
|
||||
&provider_rw,
|
||||
self.env.chain.genesis().alloc.iter(),
|
||||
)?;
|
||||
}
|
||||
StageEnum::TxLookup => {
|
||||
if provider_rw.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
|
||||
if provider_rw.cached_storage_settings().storage_v2 {
|
||||
tool.provider_factory
|
||||
.rocksdb_provider()
|
||||
.clear::<tables::TransactionHashNumbers>()?;
|
||||
|
||||
@@ -37,12 +37,14 @@ where
|
||||
unwind_and_copy(db_tool, from, tip_block_number, &output_db, evm_config.clone())?;
|
||||
|
||||
if should_run {
|
||||
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
|
||||
dry_run(
|
||||
ProviderFactory::<N>::new(
|
||||
output_db,
|
||||
db_tool.chain(),
|
||||
StaticFileProvider::read_write(output_datadir.static_files())?,
|
||||
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
|
||||
runtime,
|
||||
)?,
|
||||
to,
|
||||
from,
|
||||
|
||||
@@ -33,12 +33,14 @@ pub(crate) async fn dump_hashing_account_stage<N: ProviderNodeTypes<DB = Databas
|
||||
unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;
|
||||
|
||||
if should_run {
|
||||
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
|
||||
dry_run(
|
||||
ProviderFactory::<N>::new(
|
||||
output_db,
|
||||
db_tool.chain(),
|
||||
StaticFileProvider::read_write(output_datadir.static_files())?,
|
||||
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
|
||||
runtime,
|
||||
)?,
|
||||
to,
|
||||
from,
|
||||
|
||||
@@ -23,12 +23,14 @@ pub(crate) async fn dump_hashing_storage_stage<N: ProviderNodeTypes<DB = Databas
|
||||
unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;
|
||||
|
||||
if should_run {
|
||||
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
|
||||
dry_run(
|
||||
ProviderFactory::<N>::new(
|
||||
output_db,
|
||||
db_tool.chain(),
|
||||
StaticFileProvider::read_write(output_datadir.static_files())?,
|
||||
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
|
||||
runtime,
|
||||
)?,
|
||||
to,
|
||||
from,
|
||||
|
||||
@@ -57,12 +57,14 @@ where
|
||||
unwind_and_copy(db_tool, (from, to), tip_block_number, &output_db, evm_config, consensus)?;
|
||||
|
||||
if should_run {
|
||||
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
|
||||
dry_run(
|
||||
ProviderFactory::<N>::new(
|
||||
output_db,
|
||||
db_tool.chain(),
|
||||
StaticFileProvider::read_write(output_datadir.static_files())?,
|
||||
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
|
||||
runtime,
|
||||
)?,
|
||||
to,
|
||||
from,
|
||||
|
||||
@@ -10,9 +10,10 @@
|
||||
|
||||
//! Entrypoint for running commands.
|
||||
|
||||
use reth_tasks::{TaskExecutor, TaskManager};
|
||||
use reth_tasks::{PanickedTaskError, TaskExecutor};
|
||||
use std::{future::Future, pin::pin, sync::mpsc, time::Duration};
|
||||
use tracing::{debug, error, trace};
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
/// Executes CLI commands.
|
||||
///
|
||||
@@ -20,21 +21,24 @@ use tracing::{debug, error, trace};
|
||||
#[derive(Debug)]
|
||||
pub struct CliRunner {
|
||||
config: CliRunnerConfig,
|
||||
tokio_runtime: tokio::runtime::Runtime,
|
||||
runtime: reth_tasks::Runtime,
|
||||
}
|
||||
|
||||
impl CliRunner {
|
||||
/// Attempts to create a new [`CliRunner`] using the default tokio
|
||||
/// [`Runtime`](tokio::runtime::Runtime).
|
||||
/// Attempts to create a new [`CliRunner`] using the default
|
||||
/// [`Runtime`](reth_tasks::Runtime).
|
||||
///
|
||||
/// The default tokio runtime is multi-threaded, with both I/O and time drivers enabled.
|
||||
pub fn try_default_runtime() -> Result<Self, std::io::Error> {
|
||||
Ok(Self { config: CliRunnerConfig::default(), tokio_runtime: tokio_runtime()? })
|
||||
/// The default runtime is multi-threaded, with both I/O and time drivers enabled.
|
||||
pub fn try_default_runtime() -> Result<Self, reth_tasks::RuntimeBuildError> {
|
||||
Self::try_with_runtime_config(reth_tasks::RuntimeConfig::default())
|
||||
}
|
||||
|
||||
/// Create a new [`CliRunner`] from a provided tokio [`Runtime`](tokio::runtime::Runtime).
|
||||
pub const fn from_runtime(tokio_runtime: tokio::runtime::Runtime) -> Self {
|
||||
Self { config: CliRunnerConfig::new(), tokio_runtime }
|
||||
/// Creates a new [`CliRunner`] with the given [`RuntimeConfig`](reth_tasks::RuntimeConfig).
|
||||
pub fn try_with_runtime_config(
|
||||
config: reth_tasks::RuntimeConfig,
|
||||
) -> Result<Self, reth_tasks::RuntimeBuildError> {
|
||||
let runtime = reth_tasks::RuntimeBuilder::new(config).build()?;
|
||||
Ok(Self { config: CliRunnerConfig::default(), runtime })
|
||||
}
|
||||
|
||||
/// Sets the [`CliRunnerConfig`] for this runner.
|
||||
@@ -48,7 +52,7 @@ impl CliRunner {
|
||||
where
|
||||
F: Future<Output = T>,
|
||||
{
|
||||
self.tokio_runtime.block_on(fut)
|
||||
self.runtime.handle().block_on(fut)
|
||||
}
|
||||
|
||||
/// Executes the given _async_ command on the tokio runtime until the command future resolves or
|
||||
@@ -64,12 +68,11 @@ impl CliRunner {
|
||||
F: Future<Output = Result<(), E>>,
|
||||
E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
|
||||
{
|
||||
let AsyncCliRunner { context, mut task_manager, tokio_runtime } =
|
||||
AsyncCliRunner::new(self.tokio_runtime);
|
||||
let (context, task_manager_handle) = cli_context(&self.runtime);
|
||||
|
||||
// Executes the command until it finished or ctrl-c was fired
|
||||
let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
|
||||
&mut task_manager,
|
||||
let command_res = self.runtime.handle().block_on(run_to_completion_or_panic(
|
||||
task_manager_handle,
|
||||
run_until_ctrl_c(command(context)),
|
||||
));
|
||||
|
||||
@@ -77,13 +80,13 @@ impl CliRunner {
|
||||
error!(target: "reth::cli", "shutting down due to error");
|
||||
} else {
|
||||
debug!(target: "reth::cli", "shutting down gracefully");
|
||||
// after the command has finished or exit signal was received we shutdown the task
|
||||
// manager which fires the shutdown signal to all tasks spawned via the task
|
||||
// after the command has finished or exit signal was received we shutdown the
|
||||
// runtime which fires the shutdown signal to all tasks spawned via the task
|
||||
// executor and awaiting on tasks spawned with graceful shutdown
|
||||
task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
|
||||
self.runtime.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
|
||||
}
|
||||
|
||||
tokio_shutdown(tokio_runtime, true);
|
||||
runtime_shutdown(self.runtime, true);
|
||||
|
||||
command_res
|
||||
}
|
||||
@@ -99,17 +102,16 @@ impl CliRunner {
|
||||
F: Future<Output = Result<(), E>> + Send + 'static,
|
||||
E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
|
||||
{
|
||||
let AsyncCliRunner { context, mut task_manager, tokio_runtime } =
|
||||
AsyncCliRunner::new(self.tokio_runtime);
|
||||
let (context, task_manager_handle) = cli_context(&self.runtime);
|
||||
|
||||
// Spawn the command on the blocking thread pool
|
||||
let handle = tokio_runtime.handle().clone();
|
||||
let command_handle =
|
||||
tokio_runtime.handle().spawn_blocking(move || handle.block_on(command(context)));
|
||||
let handle = self.runtime.handle().clone();
|
||||
let handle2 = handle.clone();
|
||||
let command_handle = handle.spawn_blocking(move || handle2.block_on(command(context)));
|
||||
|
||||
// Wait for the command to complete or ctrl-c
|
||||
let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
|
||||
&mut task_manager,
|
||||
let command_res = self.runtime.handle().block_on(run_to_completion_or_panic(
|
||||
task_manager_handle,
|
||||
run_until_ctrl_c(
|
||||
async move { command_handle.await.expect("Failed to join blocking task") },
|
||||
),
|
||||
@@ -119,10 +121,10 @@ impl CliRunner {
|
||||
error!(target: "reth::cli", "shutting down due to error");
|
||||
} else {
|
||||
debug!(target: "reth::cli", "shutting down gracefully");
|
||||
task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
|
||||
self.runtime.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
|
||||
}
|
||||
|
||||
tokio_shutdown(tokio_runtime, true);
|
||||
runtime_shutdown(self.runtime, true);
|
||||
|
||||
command_res
|
||||
}
|
||||
@@ -133,48 +135,40 @@ impl CliRunner {
|
||||
F: Future<Output = Result<(), E>>,
|
||||
E: Send + Sync + From<std::io::Error> + 'static,
|
||||
{
|
||||
self.tokio_runtime.block_on(run_until_ctrl_c(fut))?;
|
||||
self.runtime.handle().block_on(run_until_ctrl_c(fut))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Executes a regular future as a spawned blocking task until completion or until external
|
||||
/// signal received.
|
||||
///
|
||||
/// See [`Runtime::spawn_blocking`](tokio::runtime::Runtime::spawn_blocking) .
|
||||
/// See [`Runtime::spawn_blocking`](tokio::runtime::Runtime::spawn_blocking).
|
||||
pub fn run_blocking_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
|
||||
where
|
||||
F: Future<Output = Result<(), E>> + Send + 'static,
|
||||
E: Send + Sync + From<std::io::Error> + 'static,
|
||||
{
|
||||
let tokio_runtime = self.tokio_runtime;
|
||||
let handle = tokio_runtime.handle().clone();
|
||||
let fut = tokio_runtime.handle().spawn_blocking(move || handle.block_on(fut));
|
||||
tokio_runtime
|
||||
let handle = self.runtime.handle().clone();
|
||||
let handle2 = handle.clone();
|
||||
let fut = handle.spawn_blocking(move || handle2.block_on(fut));
|
||||
self.runtime
|
||||
.handle()
|
||||
.block_on(run_until_ctrl_c(async move { fut.await.expect("Failed to join task") }))?;
|
||||
|
||||
tokio_shutdown(tokio_runtime, false);
|
||||
runtime_shutdown(self.runtime, false);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// [`CliRunner`] configuration when executing commands asynchronously
|
||||
struct AsyncCliRunner {
|
||||
context: CliContext,
|
||||
task_manager: TaskManager,
|
||||
tokio_runtime: tokio::runtime::Runtime,
|
||||
}
|
||||
|
||||
// === impl AsyncCliRunner ===
|
||||
|
||||
impl AsyncCliRunner {
|
||||
/// Given a tokio [`Runtime`](tokio::runtime::Runtime), creates additional context required to
|
||||
/// execute commands asynchronously.
|
||||
fn new(tokio_runtime: tokio::runtime::Runtime) -> Self {
|
||||
let task_manager = TaskManager::new(tokio_runtime.handle().clone());
|
||||
let task_executor = task_manager.executor();
|
||||
Self { context: CliContext { task_executor }, task_manager, tokio_runtime }
|
||||
}
|
||||
/// Extracts the task manager handle from the runtime and creates the [`CliContext`].
|
||||
fn cli_context(
|
||||
runtime: &reth_tasks::Runtime,
|
||||
) -> (CliContext, JoinHandle<Result<(), PanickedTaskError>>) {
|
||||
let handle =
|
||||
runtime.take_task_manager_handle().expect("Runtime must contain a TaskManager handle");
|
||||
let context = CliContext { task_executor: runtime.clone() };
|
||||
(context, handle)
|
||||
}
|
||||
|
||||
/// Additional context provided by the [`CliRunner`] when executing commands
|
||||
@@ -216,37 +210,25 @@ impl CliRunnerConfig {
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new default tokio multi-thread [Runtime](tokio::runtime::Runtime) with all features
|
||||
/// enabled
|
||||
pub fn tokio_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
// Keep the threads alive for at least the block time (12 seconds) plus buffer.
|
||||
// This prevents the costly process of spawning new threads on every
|
||||
// new block, and instead reuses the existing threads.
|
||||
.thread_keep_alive(Duration::from_secs(15))
|
||||
.thread_name("tokio-rt")
|
||||
.build()
|
||||
}
|
||||
|
||||
/// Runs the given future to completion or until a critical task panicked.
|
||||
///
|
||||
/// Returns the error if a task panicked, or the given future returned an error.
|
||||
async fn run_to_completion_or_panic<F, E>(tasks: &mut TaskManager, fut: F) -> Result<(), E>
|
||||
async fn run_to_completion_or_panic<F, E>(
|
||||
task_manager_handle: JoinHandle<Result<(), PanickedTaskError>>,
|
||||
fut: F,
|
||||
) -> Result<(), E>
|
||||
where
|
||||
F: Future<Output = Result<(), E>>,
|
||||
E: Send + Sync + From<reth_tasks::PanickedTaskError> + 'static,
|
||||
{
|
||||
{
|
||||
let fut = pin!(fut);
|
||||
tokio::select! {
|
||||
task_manager_result = tasks => {
|
||||
if let Err(panicked_error) = task_manager_result {
|
||||
return Err(panicked_error.into());
|
||||
}
|
||||
},
|
||||
res = fut => res?,
|
||||
}
|
||||
let fut = pin!(fut);
|
||||
tokio::select! {
|
||||
task_manager_result = task_manager_handle => {
|
||||
if let Ok(Err(panicked_error)) = task_manager_result {
|
||||
return Err(panicked_error.into());
|
||||
}
|
||||
},
|
||||
res = fut => res?,
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -271,10 +253,10 @@ where
|
||||
|
||||
tokio::select! {
|
||||
_ = ctrl_c => {
|
||||
trace!(target: "reth::cli", "Received ctrl-c");
|
||||
info!(target: "reth::cli", "Received ctrl-c");
|
||||
},
|
||||
_ = sigterm => {
|
||||
trace!(target: "reth::cli", "Received SIGTERM");
|
||||
info!(target: "reth::cli", "Received SIGTERM");
|
||||
},
|
||||
res = fut => res?,
|
||||
}
|
||||
@@ -287,7 +269,7 @@ where
|
||||
|
||||
tokio::select! {
|
||||
_ = ctrl_c => {
|
||||
trace!(target: "reth::cli", "Received ctrl-c");
|
||||
info!(target: "reth::cli", "Received ctrl-c");
|
||||
},
|
||||
res = fut => res?,
|
||||
}
|
||||
@@ -296,17 +278,17 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Shut down the given Tokio runtime, and wait for it if `wait` is set.
|
||||
/// Default timeout for waiting on the tokio runtime to shut down.
|
||||
const DEFAULT_RUNTIME_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
/// Shut down the given [`Runtime`](reth_tasks::Runtime), and wait for it if `wait` is set.
|
||||
///
|
||||
/// `drop(tokio_runtime)` would block the current thread until its pools
|
||||
/// (including blocking pool) are shutdown. Since we want to exit as soon as possible, drop
|
||||
/// it on a separate thread and wait for up to 5 seconds for this operation to
|
||||
/// complete.
|
||||
fn tokio_shutdown(rt: tokio::runtime::Runtime, wait: bool) {
|
||||
// Shutdown the runtime on a separate thread
|
||||
/// Dropping the runtime on the current thread could block due to tokio pool teardown.
|
||||
/// Instead, we drop it on a separate thread and optionally wait for completion.
|
||||
fn runtime_shutdown(rt: reth_tasks::Runtime, wait: bool) {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
std::thread::Builder::new()
|
||||
.name("tokio-shutdown".to_string())
|
||||
.name("rt-shutdown".to_string())
|
||||
.spawn(move || {
|
||||
drop(rt);
|
||||
let _ = tx.send(());
|
||||
@@ -314,8 +296,8 @@ fn tokio_shutdown(rt: tokio::runtime::Runtime, wait: bool) {
|
||||
.unwrap();
|
||||
|
||||
if wait {
|
||||
let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
|
||||
debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
|
||||
let _ = rx.recv_timeout(DEFAULT_RUNTIME_SHUTDOWN_TIMEOUT).inspect_err(|err| {
|
||||
tracing::warn!(target: "reth::cli", %err, "runtime shutdown timed out");
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,7 +11,6 @@ use reth_node_builder::{
|
||||
PayloadTypes,
|
||||
};
|
||||
use reth_provider::providers::{BlockchainProvider, NodeTypesForProvider};
|
||||
use reth_tasks::TaskManager;
|
||||
use std::sync::Arc;
|
||||
use wallet::Wallet;
|
||||
|
||||
@@ -50,7 +49,7 @@ pub async fn setup<N>(
|
||||
chain_spec: Arc<N::ChainSpec>,
|
||||
is_dev: bool,
|
||||
attributes_generator: impl Fn(u64) -> <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes + Send + Sync + Copy + 'static,
|
||||
) -> eyre::Result<(Vec<NodeHelperType<N>>, TaskManager, Wallet)>
|
||||
) -> eyre::Result<(Vec<NodeHelperType<N>>, Wallet)>
|
||||
where
|
||||
N: NodeBuilderHelper,
|
||||
{
|
||||
@@ -69,7 +68,6 @@ pub async fn setup_engine<N>(
|
||||
attributes_generator: impl Fn(u64) -> <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes + Send + Sync + Copy + 'static,
|
||||
) -> eyre::Result<(
|
||||
Vec<NodeHelperType<N, BlockchainProvider<NodeTypesWithDBAdapter<N, TmpDB>>>>,
|
||||
TaskManager,
|
||||
Wallet,
|
||||
)>
|
||||
where
|
||||
@@ -96,7 +94,6 @@ pub async fn setup_engine_with_connection<N>(
|
||||
connect_nodes: bool,
|
||||
) -> eyre::Result<(
|
||||
Vec<NodeHelperType<N, BlockchainProvider<NodeTypesWithDBAdapter<N, TmpDB>>>>,
|
||||
TaskManager,
|
||||
Wallet,
|
||||
)>
|
||||
where
|
||||
|
||||
@@ -14,7 +14,7 @@ use reth_node_core::args::{DiscoveryArgs, NetworkArgs, RpcServerArgs};
|
||||
use reth_primitives_traits::AlloyBlockHeader;
|
||||
use reth_provider::providers::BlockchainProvider;
|
||||
use reth_rpc_server_types::RpcModuleSelection;
|
||||
use reth_tasks::TaskManager;
|
||||
use reth_tasks::Runtime;
|
||||
use std::sync::Arc;
|
||||
use tracing::{span, Instrument, Level};
|
||||
|
||||
@@ -110,11 +110,9 @@ where
|
||||
self,
|
||||
) -> eyre::Result<(
|
||||
Vec<NodeHelperType<N, BlockchainProvider<NodeTypesWithDBAdapter<N, TmpDB>>>>,
|
||||
TaskManager,
|
||||
Wallet,
|
||||
)> {
|
||||
let tasks = TaskManager::current();
|
||||
let exec = tasks.executor();
|
||||
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
|
||||
|
||||
let network_config = NetworkArgs {
|
||||
discovery: DiscoveryArgs { disable_discovery: true, ..DiscoveryArgs::default() },
|
||||
@@ -153,7 +151,7 @@ where
|
||||
let span = span!(Level::INFO, "node", idx);
|
||||
let node = N::default();
|
||||
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config)
|
||||
.testing_node(exec.clone())
|
||||
.testing_node(runtime.clone())
|
||||
.with_types_and_provider::<N, BlockchainProvider<_>>()
|
||||
.with_components(node.components_builder())
|
||||
.with_add_ons(node.add_ons())
|
||||
@@ -197,7 +195,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
Ok((nodes, tasks, Wallet::default().with_chain_id(self.chain_spec.chain().into())))
|
||||
Ok((nodes, Wallet::default().with_chain_id(self.chain_spec.chain().into())))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ use reth_provider::{
|
||||
};
|
||||
use reth_rpc_server_types::RpcModuleSelection;
|
||||
use reth_stages_types::StageId;
|
||||
use reth_tasks::TaskManager;
|
||||
use reth_tasks::Runtime;
|
||||
use std::{path::Path, sync::Arc};
|
||||
use tempfile::TempDir;
|
||||
use tracing::{debug, info, span, Level};
|
||||
@@ -24,8 +24,6 @@ use tracing::{debug, info, span, Level};
|
||||
pub struct ChainImportResult {
|
||||
/// The nodes that were created
|
||||
pub nodes: Vec<NodeHelperType<EthereumNode>>,
|
||||
/// The task manager
|
||||
pub task_manager: TaskManager,
|
||||
/// The wallet for testing
|
||||
pub wallet: Wallet,
|
||||
/// Temporary directories that must be kept alive for the duration of the test
|
||||
@@ -68,8 +66,7 @@ pub async fn setup_engine_with_chain_import(
|
||||
+ Copy
|
||||
+ 'static,
|
||||
) -> eyre::Result<ChainImportResult> {
|
||||
let tasks = TaskManager::current();
|
||||
let exec = tasks.executor();
|
||||
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
|
||||
|
||||
let network_config = NetworkArgs {
|
||||
discovery: DiscoveryArgs { disable_discovery: true, ..DiscoveryArgs::default() },
|
||||
@@ -129,6 +126,7 @@ pub async fn setup_engine_with_chain_import(
|
||||
.with_default_tables()
|
||||
.build()
|
||||
.unwrap(),
|
||||
reth_tasks::Runtime::test(),
|
||||
)?;
|
||||
|
||||
// Initialize genesis if needed
|
||||
@@ -221,7 +219,7 @@ pub async fn setup_engine_with_chain_import(
|
||||
let node = EthereumNode::default();
|
||||
|
||||
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
|
||||
.testing_node_with_datadir(exec.clone(), datadir.clone())
|
||||
.testing_node_with_datadir(runtime.clone(), datadir.clone())
|
||||
.with_types_and_provider::<EthereumNode, BlockchainProvider<_>>()
|
||||
.with_components(node.components_builder())
|
||||
.with_add_ons(node.add_ons())
|
||||
@@ -243,7 +241,6 @@ pub async fn setup_engine_with_chain_import(
|
||||
|
||||
Ok(ChainImportResult {
|
||||
nodes,
|
||||
task_manager: tasks,
|
||||
wallet: crate::Wallet::default().with_chain_id(chain_spec.chain.id()),
|
||||
_temp_dirs: temp_dirs,
|
||||
})
|
||||
@@ -333,6 +330,7 @@ mod tests {
|
||||
.with_default_tables()
|
||||
.build()
|
||||
.unwrap(),
|
||||
reth_tasks::Runtime::test(),
|
||||
)
|
||||
.expect("failed to create provider factory");
|
||||
|
||||
@@ -397,6 +395,7 @@ mod tests {
|
||||
.with_default_tables()
|
||||
.build()
|
||||
.unwrap(),
|
||||
reth_tasks::Runtime::test(),
|
||||
)
|
||||
.expect("failed to create provider factory");
|
||||
|
||||
@@ -497,6 +496,7 @@ mod tests {
|
||||
.with_default_tables()
|
||||
.build()
|
||||
.unwrap(),
|
||||
reth_tasks::Runtime::test(),
|
||||
)
|
||||
.expect("failed to create provider factory");
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
//! Test setup utilities for configuring the initial state.
|
||||
|
||||
use crate::{setup_engine_with_connection, testsuite::Environment, NodeBuilderHelper};
|
||||
use crate::{testsuite::Environment, E2ETestSetupBuilder, NodeBuilderHelper};
|
||||
use alloy_eips::BlockNumberOrTag;
|
||||
use alloy_primitives::B256;
|
||||
use alloy_rpc_types_engine::{ForkchoiceState, PayloadAttributes};
|
||||
@@ -38,6 +38,8 @@ pub struct Setup<I> {
|
||||
shutdown_tx: Option<mpsc::Sender<()>>,
|
||||
/// Is this setup in dev mode
|
||||
pub is_dev: bool,
|
||||
/// Whether to use v2 storage mode (hashed keys, static file changesets, rocksdb history)
|
||||
pub storage_v2: bool,
|
||||
/// Tracks instance generic.
|
||||
_phantom: PhantomData<I>,
|
||||
/// Holds the import result to keep nodes alive when using imported chain
|
||||
@@ -58,6 +60,7 @@ impl<I> Default for Setup<I> {
|
||||
tree_config: TreeConfig::default(),
|
||||
shutdown_tx: None,
|
||||
is_dev: true,
|
||||
storage_v2: false,
|
||||
_phantom: Default::default(),
|
||||
import_result_holder: None,
|
||||
import_rlp_path: None,
|
||||
@@ -126,6 +129,12 @@ where
|
||||
self
|
||||
}
|
||||
|
||||
/// Enable v2 storage mode (hashed keys, static file changesets, rocksdb history)
|
||||
pub const fn with_storage_v2(mut self) -> Self {
|
||||
self.storage_v2 = true;
|
||||
self
|
||||
}
|
||||
|
||||
/// Apply setup using pre-imported chain data from RLP file
|
||||
pub async fn apply_with_import<N>(
|
||||
&mut self,
|
||||
@@ -194,23 +203,32 @@ where
|
||||
self.shutdown_tx = Some(shutdown_tx);
|
||||
|
||||
let is_dev = self.is_dev;
|
||||
let storage_v2 = self.storage_v2;
|
||||
let node_count = self.network.node_count;
|
||||
let tree_config = self.tree_config.clone();
|
||||
|
||||
let attributes_generator = Self::create_static_attributes_generator::<N>();
|
||||
|
||||
let result = setup_engine_with_connection::<N>(
|
||||
let mut builder = E2ETestSetupBuilder::<N, _>::new(
|
||||
node_count,
|
||||
Arc::<N::ChainSpec>::new((*chain_spec).clone().into()),
|
||||
is_dev,
|
||||
self.tree_config.clone(),
|
||||
attributes_generator,
|
||||
self.network.connect_nodes,
|
||||
)
|
||||
.await;
|
||||
.with_tree_config_modifier(move |base| {
|
||||
tree_config.clone().with_cross_block_cache_size(base.cross_block_cache_size())
|
||||
})
|
||||
.with_node_config_modifier(move |config| config.set_dev(is_dev))
|
||||
.with_connect_nodes(self.network.connect_nodes);
|
||||
|
||||
if storage_v2 {
|
||||
builder = builder.with_storage_v2();
|
||||
}
|
||||
|
||||
let result = builder.build().await;
|
||||
|
||||
let mut node_clients = Vec::new();
|
||||
match result {
|
||||
Ok((nodes, executor, _wallet)) => {
|
||||
Ok((nodes, _wallet)) => {
|
||||
// create HTTP clients for each node's RPC and Engine API endpoints
|
||||
for node in &nodes {
|
||||
node_clients.push(node.to_node_client()?);
|
||||
@@ -218,12 +236,11 @@ where
|
||||
|
||||
// spawn a separate task just to handle the shutdown
|
||||
tokio::spawn(async move {
|
||||
// keep nodes and executor in scope to ensure they're not dropped
|
||||
// keep nodes in scope to ensure they're not dropped
|
||||
let _nodes = nodes;
|
||||
let _executor = executor;
|
||||
// Wait for shutdown signal
|
||||
let _ = shutdown_rx.recv().await;
|
||||
// nodes and executor will be dropped here when the test completes
|
||||
// nodes will be dropped here when the test completes
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
|
||||
@@ -370,15 +370,14 @@ async fn test_setup_builder_with_custom_tree_config() -> Result<()> {
|
||||
.build(),
|
||||
);
|
||||
|
||||
let (nodes, _tasks, _wallet) =
|
||||
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, |_| {
|
||||
EthPayloadBuilderAttributes::default()
|
||||
})
|
||||
.with_tree_config_modifier(|config| {
|
||||
config.with_persistence_threshold(0).with_memory_block_buffer_target(5)
|
||||
})
|
||||
.build()
|
||||
.await?;
|
||||
let (nodes, _wallet) = E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, |_| {
|
||||
EthPayloadBuilderAttributes::default()
|
||||
})
|
||||
.with_tree_config_modifier(|config| {
|
||||
config.with_persistence_threshold(0).with_memory_block_buffer_target(5)
|
||||
})
|
||||
.build()
|
||||
.await?;
|
||||
|
||||
assert_eq!(nodes.len(), 1);
|
||||
|
||||
|
||||
@@ -10,7 +10,6 @@ use jsonrpsee::core::client::ClientT;
|
||||
use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
|
||||
use reth_db::tables;
|
||||
use reth_e2e_test_utils::{transaction::TransactionTestContext, wallet, E2ETestSetupBuilder};
|
||||
use reth_node_core::args::RocksDbArgs;
|
||||
use reth_node_ethereum::EthereumNode;
|
||||
use reth_payload_builder::EthPayloadBuilderAttributes;
|
||||
use reth_provider::RocksDBProviderFactory;
|
||||
@@ -96,22 +95,6 @@ fn test_attributes_generator(timestamp: u64) -> EthPayloadBuilderAttributes {
|
||||
EthPayloadBuilderAttributes::new(B256::ZERO, attributes)
|
||||
}
|
||||
|
||||
/// Verifies that `RocksDB` CLI defaults are `None` (deferred to storage mode).
|
||||
#[test]
|
||||
fn test_rocksdb_defaults_are_none() {
|
||||
let args = RocksDbArgs::default();
|
||||
|
||||
assert!(args.tx_hash.is_none(), "tx_hash default should be None (deferred to --storage.v2)");
|
||||
assert!(
|
||||
args.storages_history.is_none(),
|
||||
"storages_history default should be None (deferred to --storage.v2)"
|
||||
);
|
||||
assert!(
|
||||
args.account_history.is_none(),
|
||||
"account_history default should be None (deferred to --storage.v2)"
|
||||
);
|
||||
}
|
||||
|
||||
/// Smoke test: node boots with `RocksDB` routing enabled.
|
||||
#[tokio::test]
|
||||
async fn test_rocksdb_node_startup() -> Result<()> {
|
||||
@@ -119,7 +102,7 @@ async fn test_rocksdb_node_startup() -> Result<()> {
|
||||
|
||||
let chain_spec = test_chain_spec();
|
||||
|
||||
let (nodes, _tasks, _wallet) =
|
||||
let (nodes, _wallet) =
|
||||
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, test_attributes_generator)
|
||||
.with_storage_v2()
|
||||
.build()
|
||||
@@ -147,7 +130,7 @@ async fn test_rocksdb_block_mining() -> Result<()> {
|
||||
let chain_spec = test_chain_spec();
|
||||
let chain_id = chain_spec.chain().id();
|
||||
|
||||
let (mut nodes, _tasks, _wallet) =
|
||||
let (mut nodes, _wallet) =
|
||||
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, test_attributes_generator)
|
||||
.with_storage_v2()
|
||||
.build()
|
||||
@@ -201,7 +184,7 @@ async fn test_rocksdb_transaction_queries() -> Result<()> {
|
||||
let chain_spec = test_chain_spec();
|
||||
let chain_id = chain_spec.chain().id();
|
||||
|
||||
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
1,
|
||||
chain_spec.clone(),
|
||||
test_attributes_generator,
|
||||
@@ -268,7 +251,7 @@ async fn test_rocksdb_multi_tx_same_block() -> Result<()> {
|
||||
let chain_spec = test_chain_spec();
|
||||
let chain_id = chain_spec.chain().id();
|
||||
|
||||
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
1,
|
||||
chain_spec.clone(),
|
||||
test_attributes_generator,
|
||||
@@ -336,7 +319,7 @@ async fn test_rocksdb_txs_across_blocks() -> Result<()> {
|
||||
let chain_spec = test_chain_spec();
|
||||
let chain_id = chain_spec.chain().id();
|
||||
|
||||
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
1,
|
||||
chain_spec.clone(),
|
||||
test_attributes_generator,
|
||||
@@ -421,7 +404,7 @@ async fn test_rocksdb_pending_tx_not_in_storage() -> Result<()> {
|
||||
let chain_spec = test_chain_spec();
|
||||
let chain_id = chain_spec.chain().id();
|
||||
|
||||
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
1,
|
||||
chain_spec.clone(),
|
||||
test_attributes_generator,
|
||||
@@ -477,7 +460,7 @@ async fn test_rocksdb_pending_tx_not_in_storage() -> Result<()> {
|
||||
///
|
||||
/// This test exercises `unwind_trie_state_from` which previously failed with
|
||||
/// `UnsortedInput` errors because it read changesets directly from MDBX tables
|
||||
/// instead of using storage-aware methods that check `storage_changesets_in_static_files`.
|
||||
/// instead of using storage-aware methods that check `is_v2()`.
|
||||
#[tokio::test]
|
||||
async fn test_rocksdb_reorg_unwind() -> Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
@@ -485,7 +468,7 @@ async fn test_rocksdb_reorg_unwind() -> Result<()> {
|
||||
let chain_spec = test_chain_spec();
|
||||
let chain_id = chain_spec.chain().id();
|
||||
|
||||
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
1,
|
||||
chain_spec.clone(),
|
||||
test_attributes_generator,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
//! Engine tree configuration.
|
||||
|
||||
use alloy_eips::merge::EPOCH_SLOTS;
|
||||
use core::time::Duration;
|
||||
|
||||
/// Triggers persistence when the number of canonical blocks in memory exceeds this threshold.
|
||||
pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2;
|
||||
@@ -64,6 +65,9 @@ pub const DEFAULT_SPARSE_TRIE_PRUNE_DEPTH: usize = 4;
|
||||
/// Storage tries beyond this limit are cleared (but allocations preserved).
|
||||
pub const DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES: usize = 100;
|
||||
|
||||
/// Default timeout for the state root task before spawning a sequential fallback.
|
||||
pub const DEFAULT_STATE_ROOT_TASK_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
|
||||
const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = EPOCH_SLOTS as u32 * 2;
|
||||
const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256;
|
||||
const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4;
|
||||
@@ -175,6 +179,13 @@ pub struct TreeConfig {
|
||||
sparse_trie_prune_depth: usize,
|
||||
/// Maximum number of storage tries to retain after pruning.
|
||||
sparse_trie_max_storage_tries: usize,
|
||||
/// Whether to fully disable sparse trie cache pruning between blocks.
|
||||
disable_sparse_trie_cache_pruning: bool,
|
||||
/// Timeout for the state root task before spawning a sequential fallback computation.
|
||||
/// If `Some`, after waiting this duration for the state root task, a sequential state root
|
||||
/// computation is spawned in parallel and whichever finishes first is used.
|
||||
/// If `None`, the timeout fallback is disabled.
|
||||
state_root_task_timeout: Option<Duration>,
|
||||
}
|
||||
|
||||
impl Default for TreeConfig {
|
||||
@@ -207,6 +218,8 @@ impl Default for TreeConfig {
|
||||
disable_trie_cache: false,
|
||||
sparse_trie_prune_depth: DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
|
||||
sparse_trie_max_storage_tries: DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
|
||||
disable_sparse_trie_cache_pruning: false,
|
||||
state_root_task_timeout: Some(DEFAULT_STATE_ROOT_TASK_TIMEOUT),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -241,6 +254,7 @@ impl TreeConfig {
|
||||
disable_cache_metrics: bool,
|
||||
sparse_trie_prune_depth: usize,
|
||||
sparse_trie_max_storage_tries: usize,
|
||||
state_root_task_timeout: Option<Duration>,
|
||||
) -> Self {
|
||||
Self {
|
||||
persistence_threshold,
|
||||
@@ -270,6 +284,8 @@ impl TreeConfig {
|
||||
disable_trie_cache: false,
|
||||
sparse_trie_prune_depth,
|
||||
sparse_trie_max_storage_tries,
|
||||
disable_sparse_trie_cache_pruning: false,
|
||||
state_root_task_timeout,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -618,4 +634,26 @@ impl TreeConfig {
|
||||
self.sparse_trie_max_storage_tries = max_tries;
|
||||
self
|
||||
}
|
||||
|
||||
/// Returns whether sparse trie cache pruning is disabled.
|
||||
pub const fn disable_sparse_trie_cache_pruning(&self) -> bool {
|
||||
self.disable_sparse_trie_cache_pruning
|
||||
}
|
||||
|
||||
/// Setter for whether to disable sparse trie cache pruning.
|
||||
pub const fn with_disable_sparse_trie_cache_pruning(mut self, value: bool) -> Self {
|
||||
self.disable_sparse_trie_cache_pruning = value;
|
||||
self
|
||||
}
|
||||
|
||||
/// Returns the state root task timeout.
|
||||
pub const fn state_root_task_timeout(&self) -> Option<Duration> {
|
||||
self.state_root_task_timeout
|
||||
}
|
||||
|
||||
/// Setter for state root task timeout.
|
||||
pub const fn with_state_root_task_timeout(mut self, timeout: Option<Duration>) -> Self {
|
||||
self.state_root_task_timeout = timeout;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,7 +23,7 @@ use serde::{de::DeserializeOwned, Serialize};
|
||||
|
||||
// Re-export [`ExecutionPayload`] moved to `reth_payload_primitives`
|
||||
#[cfg(feature = "std")]
|
||||
pub use reth_evm::{ConfigureEngineEvm, ExecutableTxIterator, ExecutableTxTuple};
|
||||
pub use reth_evm::{ConfigureEngineEvm, ConvertTx, ExecutableTxIterator, ExecutableTxTuple};
|
||||
pub use reth_payload_primitives::ExecutionPayload;
|
||||
|
||||
mod error;
|
||||
|
||||
@@ -20,7 +20,7 @@ use reth_node_types::{BlockTy, NodeTypes};
|
||||
use reth_payload_builder::PayloadBuilderHandle;
|
||||
use reth_provider::{
|
||||
providers::{BlockchainProvider, ProviderNodeTypes},
|
||||
ProviderFactory,
|
||||
ProviderFactory, StorageSettingsCache,
|
||||
};
|
||||
use reth_prune::PrunerWithFactory;
|
||||
use reth_stages_api::{MetricEventsSender, Pipeline};
|
||||
@@ -94,6 +94,7 @@ where
|
||||
if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum };
|
||||
|
||||
let downloader = BasicBlockDownloader::new(client, consensus.clone());
|
||||
let use_hashed_state = provider.cached_storage_settings().use_hashed_state();
|
||||
|
||||
let persistence_handle =
|
||||
PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
|
||||
@@ -111,6 +112,7 @@ where
|
||||
engine_kind,
|
||||
evm_config,
|
||||
changeset_cache,
|
||||
use_hashed_state,
|
||||
);
|
||||
|
||||
let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
|
||||
@@ -201,6 +203,7 @@ mod tests {
|
||||
TreeConfig::default(),
|
||||
Box::new(NoopInvalidBlockHook::default()),
|
||||
changeset_cache.clone(),
|
||||
reth_tasks::Runtime::test(),
|
||||
);
|
||||
|
||||
let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
|
||||
|
||||
@@ -32,7 +32,6 @@ reth-stages-api.workspace = true
|
||||
reth-tasks.workspace = true
|
||||
reth-trie-parallel.workspace = true
|
||||
reth-trie-sparse = { workspace = true, features = ["std", "metrics"] }
|
||||
reth-trie-sparse-parallel = { workspace = true, features = ["std"] }
|
||||
reth-trie.workspace = true
|
||||
reth-trie-common.workspace = true
|
||||
reth-trie-db.workspace = true
|
||||
@@ -142,7 +141,15 @@ test-utils = [
|
||||
"reth-ethereum-primitives/test-utils",
|
||||
"reth-node-ethereum/test-utils",
|
||||
"reth-evm-ethereum/test-utils",
|
||||
"reth-tasks/test-utils",
|
||||
]
|
||||
rocksdb = [
|
||||
"reth-provider/rocksdb",
|
||||
"reth-prune/rocksdb",
|
||||
"reth-stages?/rocksdb",
|
||||
"reth-e2e-test-utils/rocksdb",
|
||||
]
|
||||
edge = ["rocksdb"]
|
||||
|
||||
[[test]]
|
||||
name = "e2e_testsuite"
|
||||
|
||||
@@ -12,8 +12,7 @@ use rand::Rng;
|
||||
use reth_chainspec::ChainSpec;
|
||||
use reth_db_common::init::init_genesis;
|
||||
use reth_engine_tree::tree::{
|
||||
executor::WorkloadExecutor, precompile_cache::PrecompileCacheMap, PayloadProcessor,
|
||||
StateProviderBuilder, TreeConfig,
|
||||
precompile_cache::PrecompileCacheMap, PayloadProcessor, StateProviderBuilder, TreeConfig,
|
||||
};
|
||||
use reth_ethereum_primitives::TransactionSigned;
|
||||
use reth_evm::OnStateHook;
|
||||
@@ -219,7 +218,7 @@ fn bench_state_root(c: &mut Criterion) {
|
||||
setup_provider(&factory, &state_updates).expect("failed to setup provider");
|
||||
|
||||
let payload_processor = PayloadProcessor::new(
|
||||
WorkloadExecutor::default(),
|
||||
reth_tasks::Runtime::test(),
|
||||
EthEvmConfig::new(factory.chain_spec()),
|
||||
&TreeConfig::default(),
|
||||
PrecompileCacheMap::default(),
|
||||
|
||||
@@ -138,7 +138,7 @@ impl<N: ProviderNodeTypes> PipelineSync<N> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let pipeline = pipeline.take().expect("exists");
|
||||
self.pipeline_task_spawner.spawn_critical_blocking(
|
||||
self.pipeline_task_spawner.spawn_critical_blocking_task(
|
||||
"pipeline task",
|
||||
Box::pin(async move {
|
||||
let result = pipeline.run_as_fut(Some(target)).await;
|
||||
|
||||
@@ -76,8 +76,16 @@ impl CacheConfig for EpochCacheConfig {
|
||||
type FixedCache<K, V, H = DefaultHashBuilder> = fixed_cache::Cache<K, V, H, EpochCacheConfig>;
|
||||
|
||||
/// A wrapper of a state provider and a shared cache.
|
||||
///
|
||||
/// The const generic `PREWARM` controls whether every cache miss is populated. This is only
|
||||
/// relevant for pre-warm transaction execution with the intention to pre-populate the cache with
|
||||
/// data for regular block execution. During regular block execution the cache doesn't need to be
|
||||
/// populated because the actual EVM database [`State`](revm::database::State) also caches
|
||||
/// internally during block execution and the cache is then updated after the block with the entire
|
||||
/// [`BundleState`] output of that block which contains all accessed accounts, code, storage. See
|
||||
/// also [`ExecutionCache::insert_state`].
|
||||
#[derive(Debug)]
|
||||
pub struct CachedStateProvider<S> {
|
||||
pub struct CachedStateProvider<S, const PREWARM: bool = false> {
|
||||
/// The state provider
|
||||
state_provider: S,
|
||||
|
||||
@@ -86,15 +94,9 @@ pub struct CachedStateProvider<S> {
|
||||
|
||||
/// Metrics for the cached state provider
|
||||
metrics: CachedStateMetrics,
|
||||
|
||||
/// If prewarm enabled we populate every cache miss
|
||||
prewarm: bool,
|
||||
}
|
||||
|
||||
impl<S> CachedStateProvider<S>
|
||||
where
|
||||
S: StateProvider,
|
||||
{
|
||||
impl<S> CachedStateProvider<S> {
|
||||
/// Creates a new [`CachedStateProvider`] from an [`ExecutionCache`], state provider, and
|
||||
/// [`CachedStateMetrics`].
|
||||
pub const fn new(
|
||||
@@ -102,27 +104,18 @@ where
|
||||
caches: ExecutionCache,
|
||||
metrics: CachedStateMetrics,
|
||||
) -> Self {
|
||||
Self { state_provider, caches, metrics, prewarm: false }
|
||||
Self { state_provider, caches, metrics }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> CachedStateProvider<S> {
|
||||
/// Enables pre-warm mode so that every cache miss is populated.
|
||||
///
|
||||
/// This is only relevant for pre-warm transaction execution with the intention to pre-populate
|
||||
/// the cache with data for regular block execution. During regular block execution the
|
||||
/// cache doesn't need to be populated because the actual EVM database
|
||||
/// [`State`](revm::database::State) also caches internally during block execution and the cache
|
||||
/// is then updated after the block with the entire [`BundleState`] output of that block which
|
||||
/// contains all accessed accounts,code,storage. See also [`ExecutionCache::insert_state`].
|
||||
pub const fn prewarm(mut self) -> Self {
|
||||
self.prewarm = true;
|
||||
self
|
||||
}
|
||||
|
||||
/// Returns whether this provider should pre-warm cache misses.
|
||||
const fn is_prewarm(&self) -> bool {
|
||||
self.prewarm
|
||||
impl<S> CachedStateProvider<S, true> {
|
||||
/// Creates a new [`CachedStateProvider`] with prewarming enabled.
|
||||
pub const fn new_prewarm(
|
||||
state_provider: S,
|
||||
caches: ExecutionCache,
|
||||
metrics: CachedStateMetrics,
|
||||
) -> Self {
|
||||
Self { state_provider, caches, metrics }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -307,9 +300,9 @@ impl<K: PartialEq, V> StatsHandler<K, V> for CacheStatsHandler {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AccountReader> AccountReader for CachedStateProvider<S> {
|
||||
impl<S: AccountReader, const PREWARM: bool> AccountReader for CachedStateProvider<S, PREWARM> {
|
||||
fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
|
||||
if self.is_prewarm() {
|
||||
if PREWARM {
|
||||
match self.caches.get_or_try_insert_account_with(*address, || {
|
||||
self.state_provider.basic_account(address)
|
||||
})? {
|
||||
@@ -334,13 +327,13 @@ pub enum CachedStatus<T> {
|
||||
Cached(T),
|
||||
}
|
||||
|
||||
impl<S: StateProvider> StateProvider for CachedStateProvider<S> {
|
||||
impl<S: StateProvider, const PREWARM: bool> StateProvider for CachedStateProvider<S, PREWARM> {
|
||||
fn storage(
|
||||
&self,
|
||||
account: Address,
|
||||
storage_key: StorageKey,
|
||||
) -> ProviderResult<Option<StorageValue>> {
|
||||
if self.is_prewarm() {
|
||||
if PREWARM {
|
||||
match self.caches.get_or_try_insert_storage_with(account, storage_key, || {
|
||||
self.state_provider.storage(account, storage_key).map(Option::unwrap_or_default)
|
||||
})? {
|
||||
@@ -358,11 +351,19 @@ impl<S: StateProvider> StateProvider for CachedStateProvider<S> {
|
||||
self.state_provider.storage(account, storage_key)
|
||||
}
|
||||
}
|
||||
|
||||
fn storage_by_hashed_key(
|
||||
&self,
|
||||
address: Address,
|
||||
hashed_storage_key: StorageKey,
|
||||
) -> ProviderResult<Option<StorageValue>> {
|
||||
self.state_provider.storage_by_hashed_key(address, hashed_storage_key)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: BytecodeReader> BytecodeReader for CachedStateProvider<S> {
|
||||
impl<S: BytecodeReader, const PREWARM: bool> BytecodeReader for CachedStateProvider<S, PREWARM> {
|
||||
fn bytecode_by_hash(&self, code_hash: &B256) -> ProviderResult<Option<Bytecode>> {
|
||||
if self.is_prewarm() {
|
||||
if PREWARM {
|
||||
match self.caches.get_or_try_insert_code_with(*code_hash, || {
|
||||
self.state_provider.bytecode_by_hash(code_hash)
|
||||
})? {
|
||||
@@ -378,7 +379,9 @@ impl<S: BytecodeReader> BytecodeReader for CachedStateProvider<S> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: StateRootProvider> StateRootProvider for CachedStateProvider<S> {
|
||||
impl<S: StateRootProvider, const PREWARM: bool> StateRootProvider
|
||||
for CachedStateProvider<S, PREWARM>
|
||||
{
|
||||
fn state_root(&self, hashed_state: HashedPostState) -> ProviderResult<B256> {
|
||||
self.state_provider.state_root(hashed_state)
|
||||
}
|
||||
@@ -402,7 +405,9 @@ impl<S: StateRootProvider> StateRootProvider for CachedStateProvider<S> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: StateProofProvider> StateProofProvider for CachedStateProvider<S> {
|
||||
impl<S: StateProofProvider, const PREWARM: bool> StateProofProvider
|
||||
for CachedStateProvider<S, PREWARM>
|
||||
{
|
||||
fn proof(
|
||||
&self,
|
||||
input: TrieInput,
|
||||
@@ -429,7 +434,9 @@ impl<S: StateProofProvider> StateProofProvider for CachedStateProvider<S> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: StorageRootProvider> StorageRootProvider for CachedStateProvider<S> {
|
||||
impl<S: StorageRootProvider, const PREWARM: bool> StorageRootProvider
|
||||
for CachedStateProvider<S, PREWARM>
|
||||
{
|
||||
fn storage_root(
|
||||
&self,
|
||||
address: Address,
|
||||
@@ -457,7 +464,7 @@ impl<S: StorageRootProvider> StorageRootProvider for CachedStateProvider<S> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: BlockHashReader> BlockHashReader for CachedStateProvider<S> {
|
||||
impl<S: BlockHashReader, const PREWARM: bool> BlockHashReader for CachedStateProvider<S, PREWARM> {
|
||||
fn block_hash(&self, number: alloy_primitives::BlockNumber) -> ProviderResult<Option<B256>> {
|
||||
self.state_provider.block_hash(number)
|
||||
}
|
||||
@@ -471,7 +478,9 @@ impl<S: BlockHashReader> BlockHashReader for CachedStateProvider<S> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: HashedPostStateProvider> HashedPostStateProvider for CachedStateProvider<S> {
|
||||
impl<S: HashedPostStateProvider, const PREWARM: bool> HashedPostStateProvider
|
||||
for CachedStateProvider<S, PREWARM>
|
||||
{
|
||||
fn hashed_post_state(&self, bundle_state: &reth_revm::db::BundleState) -> HashedPostState {
|
||||
self.state_provider.hashed_post_state(bundle_state)
|
||||
}
|
||||
@@ -836,8 +845,10 @@ impl SavedCache {
|
||||
self.caches.update_metrics(&self.metrics);
|
||||
}
|
||||
|
||||
/// Clears all caches, resetting them to empty state.
|
||||
pub(crate) fn clear(&self) {
|
||||
/// Clears all caches, resetting them to empty state,
|
||||
/// and updates the hash of the block this cache belongs to.
|
||||
pub(crate) fn clear_with_hash(&mut self, hash: B256) {
|
||||
self.hash = hash;
|
||||
self.caches.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -199,6 +199,17 @@ impl<S: StateProvider> StateProvider for InstrumentedStateProvider<S> {
|
||||
self.record_storage_fetch(start.elapsed());
|
||||
res
|
||||
}
|
||||
|
||||
fn storage_by_hashed_key(
|
||||
&self,
|
||||
address: Address,
|
||||
hashed_storage_key: StorageKey,
|
||||
) -> ProviderResult<Option<StorageValue>> {
|
||||
let start = Instant::now();
|
||||
let res = self.state_provider.storage_by_hashed_key(address, hashed_storage_key);
|
||||
self.record_storage_fetch(start.elapsed());
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: BytecodeReader> BytecodeReader for InstrumentedStateProvider<S> {
|
||||
|
||||
@@ -8,9 +8,18 @@ use reth_metrics::{
|
||||
metrics::{Counter, Gauge, Histogram},
|
||||
Metrics,
|
||||
};
|
||||
use reth_primitives_traits::constants::gas_units::MEGAGAS;
|
||||
use reth_trie::updates::TrieUpdates;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
/// Upper bounds for each gas bucket. The last bucket is a catch-all for
|
||||
/// everything above the final threshold: <5M, 5-10M, 10-20M, 20-30M, 30-40M, >40M.
|
||||
const GAS_BUCKET_THRESHOLDS: [u64; 5] =
|
||||
[5 * MEGAGAS, 10 * MEGAGAS, 20 * MEGAGAS, 30 * MEGAGAS, 40 * MEGAGAS];
|
||||
|
||||
/// Total number of gas buckets (thresholds + 1 catch-all).
|
||||
const NUM_GAS_BUCKETS: usize = GAS_BUCKET_THRESHOLDS.len() + 1;
|
||||
|
||||
/// Metrics for the `EngineApi`.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct EngineApiMetrics {
|
||||
@@ -90,8 +99,9 @@ impl EngineApiMetrics {
|
||||
pub struct TreeMetrics {
|
||||
/// The highest block number in the canonical chain
|
||||
pub canonical_chain_height: Gauge,
|
||||
/// The number of reorgs
|
||||
pub reorgs: Counter,
|
||||
/// Metrics for reorgs.
|
||||
#[metric(skip)]
|
||||
pub reorgs: ReorgMetrics,
|
||||
/// The latest reorg depth
|
||||
pub latest_reorg_depth: Gauge,
|
||||
/// The current safe block height (this is required by optimism)
|
||||
@@ -100,6 +110,27 @@ pub struct TreeMetrics {
|
||||
pub finalized_block_height: Gauge,
|
||||
}
|
||||
|
||||
/// Metrics for reorgs.
|
||||
#[derive(Debug)]
|
||||
pub struct ReorgMetrics {
|
||||
/// The number of head block reorgs
|
||||
pub head: Counter,
|
||||
/// The number of safe block reorgs
|
||||
pub safe: Counter,
|
||||
/// The number of finalized block reorgs
|
||||
pub finalized: Counter,
|
||||
}
|
||||
|
||||
impl Default for ReorgMetrics {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
head: metrics::counter!("blockchain_tree_reorgs", "commitment" => "head"),
|
||||
safe: metrics::counter!("blockchain_tree_reorgs", "commitment" => "safe"),
|
||||
finalized: metrics::counter!("blockchain_tree_reorgs", "commitment" => "finalized"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Metrics for the `EngineApi`.
|
||||
#[derive(Metrics)]
|
||||
#[metrics(scope = "consensus.engine.beacon")]
|
||||
@@ -213,6 +244,65 @@ impl ForkchoiceUpdatedMetrics {
|
||||
}
|
||||
}
|
||||
|
||||
/// Per-gas-bucket newPayload metrics, initialized once via [`Self::new_with_labels`].
|
||||
#[derive(Clone, Metrics)]
|
||||
#[metrics(scope = "consensus.engine.beacon")]
|
||||
pub(crate) struct NewPayloadGasBucketMetrics {
|
||||
/// Latency for new payload calls in this gas bucket.
|
||||
pub(crate) new_payload_gas_bucket_latency: Histogram,
|
||||
/// Gas per second for new payload calls in this gas bucket.
|
||||
pub(crate) new_payload_gas_bucket_gas_per_second: Histogram,
|
||||
}
|
||||
|
||||
/// Holds pre-initialized [`NewPayloadGasBucketMetrics`] instances, one per gas bucket.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct GasBucketMetrics {
|
||||
buckets: [NewPayloadGasBucketMetrics; NUM_GAS_BUCKETS],
|
||||
}
|
||||
|
||||
impl Default for GasBucketMetrics {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
buckets: std::array::from_fn(|i| {
|
||||
let label = Self::bucket_label(i);
|
||||
NewPayloadGasBucketMetrics::new_with_labels(&[("gas_bucket", label)])
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl GasBucketMetrics {
|
||||
fn record(&self, gas_used: u64, elapsed: Duration) {
|
||||
let idx = Self::bucket_index(gas_used);
|
||||
self.buckets[idx].new_payload_gas_bucket_latency.record(elapsed);
|
||||
self.buckets[idx]
|
||||
.new_payload_gas_bucket_gas_per_second
|
||||
.record(gas_used as f64 / elapsed.as_secs_f64());
|
||||
}
|
||||
|
||||
fn bucket_index(gas_used: u64) -> usize {
|
||||
GAS_BUCKET_THRESHOLDS
|
||||
.iter()
|
||||
.position(|&threshold| gas_used < threshold)
|
||||
.unwrap_or(GAS_BUCKET_THRESHOLDS.len())
|
||||
}
|
||||
|
||||
/// Returns a human-readable label like `<5M`, `5-10M`, … `>40M`.
|
||||
fn bucket_label(index: usize) -> String {
|
||||
if index == 0 {
|
||||
let hi = GAS_BUCKET_THRESHOLDS[0] / MEGAGAS;
|
||||
format!("<{hi}M")
|
||||
} else if index < GAS_BUCKET_THRESHOLDS.len() {
|
||||
let lo = GAS_BUCKET_THRESHOLDS[index - 1] / MEGAGAS;
|
||||
let hi = GAS_BUCKET_THRESHOLDS[index] / MEGAGAS;
|
||||
format!("{lo}-{hi}M")
|
||||
} else {
|
||||
let lo = GAS_BUCKET_THRESHOLDS[GAS_BUCKET_THRESHOLDS.len() - 1] / MEGAGAS;
|
||||
format!(">{lo}M")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Metrics for engine newPayload responses.
|
||||
#[derive(Metrics)]
|
||||
#[metrics(scope = "consensus.engine.beacon")]
|
||||
@@ -223,6 +313,9 @@ pub(crate) struct NewPayloadStatusMetrics {
|
||||
/// Start time of the latest new payload call.
|
||||
#[metric(skip)]
|
||||
pub(crate) latest_start_at: Option<Instant>,
|
||||
/// Gas-bucket-labeled latency and gas/s histograms.
|
||||
#[metric(skip)]
|
||||
pub(crate) gas_bucket: GasBucketMetrics,
|
||||
/// The total count of new payload messages received.
|
||||
pub(crate) new_payload_messages: Counter,
|
||||
/// The total count of new payload messages that we responded to with
|
||||
@@ -299,6 +392,7 @@ impl NewPayloadStatusMetrics {
|
||||
self.new_payload_messages.increment(1);
|
||||
self.new_payload_latency.record(elapsed);
|
||||
self.new_payload_last.set(elapsed);
|
||||
self.gas_bucket.record(gas_used, elapsed);
|
||||
if let Some(latest_forkchoice_updated_at) = latest_forkchoice_updated_at.take() {
|
||||
self.forkchoice_updated_new_payload_time_diff
|
||||
.record(start - latest_forkchoice_updated_at);
|
||||
@@ -343,6 +437,8 @@ pub struct BlockValidationMetrics {
|
||||
pub state_root_parallel_fallback_total: Counter,
|
||||
/// Total number of times the state root task failed but the fallback succeeded.
|
||||
pub state_root_task_fallback_success_total: Counter,
|
||||
/// Total number of times the state root task timed out and a sequential fallback was spawned.
|
||||
pub state_root_task_timeout_total: Counter,
|
||||
/// Latest state root duration, ie the time spent blocked waiting for the state root.
|
||||
pub state_root_duration: Gauge,
|
||||
/// Histogram for state root duration ie the time spent blocked waiting for the state root
|
||||
|
||||
@@ -32,12 +32,13 @@ use reth_provider::{
|
||||
BlockExecutionOutput, BlockExecutionResult, BlockReader, ChangeSetReader,
|
||||
DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StageCheckpointReader,
|
||||
StateProviderBox, StateProviderFactory, StateReader, StorageChangeSetReader,
|
||||
TransactionVariant,
|
||||
StorageSettingsCache, TransactionVariant,
|
||||
};
|
||||
use reth_revm::database::StateProviderDatabase;
|
||||
use reth_stages_api::ControlFlow;
|
||||
use reth_tasks::spawn_os_thread;
|
||||
use reth_trie_db::ChangesetCache;
|
||||
use revm::interpreter::debug_unreachable;
|
||||
use state::TreeState;
|
||||
use std::{fmt::Debug, ops, sync::Arc, time::Instant};
|
||||
|
||||
@@ -270,6 +271,9 @@ where
|
||||
evm_config: C,
|
||||
/// Changeset cache for in-memory trie changesets
|
||||
changeset_cache: ChangesetCache,
|
||||
/// Whether the node uses hashed state as canonical storage (v2 mode).
|
||||
/// Cached at construction to avoid threading `StorageSettingsCache` bounds everywhere.
|
||||
use_hashed_state: bool,
|
||||
}
|
||||
|
||||
impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
|
||||
@@ -295,6 +299,7 @@ where
|
||||
.field("engine_kind", &self.engine_kind)
|
||||
.field("evm_config", &self.evm_config)
|
||||
.field("changeset_cache", &self.changeset_cache)
|
||||
.field("use_hashed_state", &self.use_hashed_state)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -312,7 +317,8 @@ where
|
||||
P::Provider: BlockReader<Block = N::Block, Header = N::BlockHeader>
|
||||
+ StageCheckpointReader
|
||||
+ ChangeSetReader
|
||||
+ StorageChangeSetReader,
|
||||
+ StorageChangeSetReader
|
||||
+ StorageSettingsCache,
|
||||
C: ConfigureEvm<Primitives = N> + 'static,
|
||||
T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
|
||||
V: EngineValidator<T>,
|
||||
@@ -333,6 +339,7 @@ where
|
||||
engine_kind: EngineApiKind,
|
||||
evm_config: C,
|
||||
changeset_cache: ChangesetCache,
|
||||
use_hashed_state: bool,
|
||||
) -> Self {
|
||||
let (incoming_tx, incoming) = crossbeam_channel::unbounded();
|
||||
|
||||
@@ -354,6 +361,7 @@ where
|
||||
engine_kind,
|
||||
evm_config,
|
||||
changeset_cache,
|
||||
use_hashed_state,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -374,6 +382,7 @@ where
|
||||
kind: EngineApiKind,
|
||||
evm_config: C,
|
||||
changeset_cache: ChangesetCache,
|
||||
use_hashed_state: bool,
|
||||
) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
|
||||
{
|
||||
let best_block_number = provider.best_block_number().unwrap_or(0);
|
||||
@@ -406,6 +415,7 @@ where
|
||||
kind,
|
||||
evm_config,
|
||||
changeset_cache,
|
||||
use_hashed_state,
|
||||
);
|
||||
let incoming = task.incoming_tx.clone();
|
||||
spawn_os_thread("engine", || task.run());
|
||||
@@ -1401,7 +1411,7 @@ where
|
||||
// Spawn a background task to trigger computation so it's ready when the next payload
|
||||
// arrives.
|
||||
if let Some(overlay) = self.state.tree_state.prepare_canonical_overlay() {
|
||||
rayon::spawn(move || {
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let _ = overlay.get();
|
||||
});
|
||||
}
|
||||
@@ -1509,7 +1519,7 @@ where
|
||||
.engine
|
||||
.failed_forkchoice_updated_response_deliveries
|
||||
.increment(1);
|
||||
error!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
|
||||
warn!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to deliver forkchoiceUpdated response, receiver dropped (request cancelled): {err:?}");
|
||||
}
|
||||
}
|
||||
BeaconEngineMessage::NewPayload { payload, tx } => {
|
||||
@@ -1533,7 +1543,7 @@ where
|
||||
BeaconOnNewPayloadError::Internal(Box::new(e))
|
||||
}))
|
||||
{
|
||||
error!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to send event: {err:?}");
|
||||
warn!(target: "engine::tree", payload=?num_hash, elapsed=?start.elapsed(), "Failed to deliver newPayload response, receiver dropped (request cancelled): {err:?}");
|
||||
self.metrics
|
||||
.engine
|
||||
.failed_new_payload_response_deliveries
|
||||
@@ -2376,9 +2386,14 @@ where
|
||||
let old_first = old.first().map(|first| first.recovered_block().num_hash());
|
||||
trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
|
||||
|
||||
self.update_reorg_metrics(old.len());
|
||||
self.update_reorg_metrics(old.len(), old_first);
|
||||
self.reinsert_reorged_blocks(new.clone());
|
||||
self.reinsert_reorged_blocks(old.clone());
|
||||
|
||||
// When use_hashed_state is enabled, skip reinserting the old chain — the
|
||||
// bundle state references plain state reverts which don't exist.
|
||||
if !self.use_hashed_state {
|
||||
self.reinsert_reorged_blocks(old.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// update the tracked in-memory state with the new chain
|
||||
@@ -2398,9 +2413,23 @@ where
|
||||
));
|
||||
}
|
||||
|
||||
/// This updates metrics based on the given reorg length.
|
||||
fn update_reorg_metrics(&self, old_chain_length: usize) {
|
||||
self.metrics.tree.reorgs.increment(1);
|
||||
/// This updates metrics based on the given reorg length and first reorged block number.
|
||||
fn update_reorg_metrics(&self, old_chain_length: usize, first_reorged_block: Option<NumHash>) {
|
||||
if let Some(first_reorged_block) = first_reorged_block.map(|block| block.number) {
|
||||
if let Some(finalized) = self.canonical_in_memory_state.get_finalized_num_hash() &&
|
||||
first_reorged_block <= finalized.number
|
||||
{
|
||||
self.metrics.tree.reorgs.finalized.increment(1);
|
||||
} else if let Some(safe) = self.canonical_in_memory_state.get_safe_num_hash() &&
|
||||
first_reorged_block <= safe.number
|
||||
{
|
||||
self.metrics.tree.reorgs.safe.increment(1);
|
||||
} else {
|
||||
self.metrics.tree.reorgs.head.increment(1);
|
||||
}
|
||||
} else {
|
||||
debug_unreachable!("Reorged chain doesn't have any blocks");
|
||||
}
|
||||
self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,47 +0,0 @@
|
||||
//! Executor for mixed I/O and CPU workloads.
|
||||
|
||||
use reth_trie_parallel::root::get_tokio_runtime_handle;
|
||||
use tokio::{runtime::Handle, task::JoinHandle};
|
||||
|
||||
/// An executor for mixed I/O and CPU workloads.
|
||||
///
|
||||
/// This type uses tokio to spawn blocking tasks and will reuse an existing tokio
|
||||
/// runtime if available or create its own.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct WorkloadExecutor {
|
||||
inner: WorkloadExecutorInner,
|
||||
}
|
||||
|
||||
impl Default for WorkloadExecutor {
|
||||
fn default() -> Self {
|
||||
Self { inner: WorkloadExecutorInner::new() }
|
||||
}
|
||||
}
|
||||
|
||||
impl WorkloadExecutor {
|
||||
/// Returns the handle to the tokio runtime
|
||||
pub(super) const fn handle(&self) -> &Handle {
|
||||
&self.inner.handle
|
||||
}
|
||||
|
||||
/// Runs the provided function on an executor dedicated to blocking operations.
|
||||
#[track_caller]
|
||||
pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
|
||||
where
|
||||
F: FnOnce() -> R + Send + 'static,
|
||||
R: Send + 'static,
|
||||
{
|
||||
self.inner.handle.spawn_blocking(func)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct WorkloadExecutorInner {
|
||||
handle: Handle,
|
||||
}
|
||||
|
||||
impl WorkloadExecutorInner {
|
||||
fn new() -> Self {
|
||||
Self { handle: get_tokio_runtime_handle() }
|
||||
}
|
||||
}
|
||||
@@ -11,11 +11,10 @@ use crate::tree::{
|
||||
StateProviderBuilder, TreeConfig,
|
||||
};
|
||||
use alloy_eip7928::BlockAccessList;
|
||||
use alloy_eips::eip1898::BlockWithParent;
|
||||
use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
|
||||
use alloy_evm::block::StateChangeSource;
|
||||
use alloy_primitives::B256;
|
||||
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
|
||||
use executor::WorkloadExecutor;
|
||||
use metrics::{Counter, Histogram};
|
||||
use multiproof::{SparseTrieUpdate, *};
|
||||
use parking_lot::RwLock;
|
||||
@@ -24,8 +23,8 @@ use rayon::prelude::*;
|
||||
use reth_evm::{
|
||||
block::ExecutableTxParts,
|
||||
execute::{ExecutableTxFor, WithTxEnv},
|
||||
ConfigureEvm, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook, SpecFor,
|
||||
TxEnvFor,
|
||||
ConfigureEvm, ConvertTx, EvmEnvFor, ExecutableTxIterator, ExecutableTxTuple, OnStateHook,
|
||||
SpecFor, TxEnvFor,
|
||||
};
|
||||
use reth_metrics::Metrics;
|
||||
use reth_primitives_traits::NodePrimitives;
|
||||
@@ -34,13 +33,15 @@ use reth_provider::{
|
||||
StateProviderFactory, StateReader,
|
||||
};
|
||||
use reth_revm::{db::BundleState, state::EvmState};
|
||||
use reth_tasks::Runtime;
|
||||
use reth_trie::{hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory};
|
||||
use reth_trie_parallel::{
|
||||
proof_task::{ProofTaskCtx, ProofWorkerHandle},
|
||||
root::ParallelStateRootError,
|
||||
};
|
||||
use reth_trie_sparse::{RevealableSparseTrie, SparseStateTrie};
|
||||
use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds};
|
||||
use reth_trie_sparse::{
|
||||
ParallelSparseTrie, ParallelismThresholds, RevealableSparseTrie, SparseStateTrie,
|
||||
};
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
ops::Not,
|
||||
@@ -49,12 +50,11 @@ use std::{
|
||||
mpsc::{self, channel},
|
||||
Arc,
|
||||
},
|
||||
time::Instant,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tracing::{debug, debug_span, instrument, warn, Span};
|
||||
|
||||
pub mod bal;
|
||||
pub mod executor;
|
||||
pub mod multiproof;
|
||||
mod preserved_sparse_trie;
|
||||
pub mod prewarm;
|
||||
@@ -94,6 +94,9 @@ pub const SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY: usize = 1_000_000;
|
||||
/// 144MB.
|
||||
pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
|
||||
|
||||
/// Blocks with fewer transactions than this skip prewarming, since the fixed overhead of spawning
|
||||
/// prewarm workers exceeds the execution time saved.
|
||||
pub const SMALL_BLOCK_TX_THRESHOLD: usize = 5;
|
||||
/// Type alias for [`PayloadHandle`] returned by payload processor spawn methods.
|
||||
type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
|
||||
WithTxEnv<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>,
|
||||
@@ -108,7 +111,7 @@ where
|
||||
Evm: ConfigureEvm,
|
||||
{
|
||||
/// The executor used by to spawn tasks.
|
||||
executor: WorkloadExecutor,
|
||||
executor: Runtime,
|
||||
/// The most recent cache used for execution.
|
||||
execution_cache: PayloadExecutionCache,
|
||||
/// Metrics for trie operations
|
||||
@@ -135,6 +138,8 @@ where
|
||||
sparse_trie_prune_depth: usize,
|
||||
/// Maximum storage tries to retain after pruning.
|
||||
sparse_trie_max_storage_tries: usize,
|
||||
/// Whether sparse trie cache pruning is fully disabled.
|
||||
disable_sparse_trie_cache_pruning: bool,
|
||||
/// Whether to disable cache metrics recording.
|
||||
disable_cache_metrics: bool,
|
||||
}
|
||||
@@ -145,13 +150,13 @@ where
|
||||
Evm: ConfigureEvm<Primitives = N>,
|
||||
{
|
||||
/// Returns a reference to the workload executor driving payload tasks.
|
||||
pub const fn executor(&self) -> &WorkloadExecutor {
|
||||
pub const fn executor(&self) -> &Runtime {
|
||||
&self.executor
|
||||
}
|
||||
|
||||
/// Creates a new payload processor.
|
||||
pub fn new(
|
||||
executor: WorkloadExecutor,
|
||||
executor: Runtime,
|
||||
evm_config: Evm,
|
||||
config: &TreeConfig,
|
||||
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
|
||||
@@ -170,6 +175,7 @@ where
|
||||
prewarm_max_concurrency: config.prewarm_max_concurrency(),
|
||||
sparse_trie_prune_depth: config.sparse_trie_prune_depth(),
|
||||
sparse_trie_max_storage_tries: config.sparse_trie_max_storage_tries(),
|
||||
disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
|
||||
disable_cache_metrics: config.disable_cache_metrics(),
|
||||
}
|
||||
}
|
||||
@@ -235,7 +241,8 @@ where
|
||||
+ 'static,
|
||||
{
|
||||
// start preparing transactions immediately
|
||||
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
|
||||
let (prewarm_rx, execution_rx) =
|
||||
self.spawn_tx_iterator(transactions, env.transaction_count);
|
||||
|
||||
let span = Span::current();
|
||||
let (to_sparse_trie, sparse_trie_rx) = channel();
|
||||
@@ -252,15 +259,13 @@ where
|
||||
// When BAL is present, use BAL prewarming and send BAL to multiproof
|
||||
debug!(target: "engine::tree::payload_processor", "BAL present, using BAL prewarming");
|
||||
|
||||
// Send BAL message immediately to MultiProofTask
|
||||
let _ = to_multi_proof.send(MultiProofMessage::BlockAccessList(Arc::clone(&bal)));
|
||||
|
||||
// Spawn with BAL prewarming
|
||||
// The prewarm task converts the BAL to HashedPostState and sends it on
|
||||
// to_multi_proof after slot prefetching completes.
|
||||
self.spawn_caching_with(
|
||||
env,
|
||||
prewarm_rx,
|
||||
provider_builder.clone(),
|
||||
None, // Don't send proof targets when BAL is present
|
||||
Some(to_multi_proof.clone()),
|
||||
Some(bal),
|
||||
v2_proofs_enabled,
|
||||
)
|
||||
@@ -278,15 +283,7 @@ where
|
||||
|
||||
// Create and spawn the storage proof task
|
||||
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
|
||||
let storage_worker_count = config.storage_worker_count();
|
||||
let account_worker_count = config.account_worker_count();
|
||||
let proof_handle = ProofWorkerHandle::new(
|
||||
self.executor.handle().clone(),
|
||||
task_ctx,
|
||||
storage_worker_count,
|
||||
account_worker_count,
|
||||
v2_proofs_enabled,
|
||||
);
|
||||
let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, v2_proofs_enabled);
|
||||
|
||||
if config.disable_trie_cache() {
|
||||
let multi_proof_task = MultiProofTask::new(
|
||||
@@ -352,7 +349,8 @@ where
|
||||
where
|
||||
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
|
||||
{
|
||||
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
|
||||
let (prewarm_rx, execution_rx) =
|
||||
self.spawn_tx_iterator(transactions, env.transaction_count);
|
||||
// This path doesn't use multiproof, so V2 proofs flag doesn't matter
|
||||
let prewarm_handle =
|
||||
self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal, false);
|
||||
@@ -365,11 +363,23 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Transaction count threshold below which sequential signature recovery is used.
|
||||
///
|
||||
/// For blocks with fewer than this many transactions, the rayon parallel iterator overhead
|
||||
/// (work-stealing setup, channel-based reorder) exceeds the cost of sequential ECDSA
|
||||
/// recovery. Inspired by Nethermind's `RecoverSignature` which uses sequential `foreach`
|
||||
/// for small blocks.
|
||||
const SMALL_BLOCK_TX_THRESHOLD: usize = 30;
|
||||
|
||||
/// Spawns a task advancing transaction env iterator and streaming updates through a channel.
|
||||
///
|
||||
/// For blocks with fewer than [`Self::SMALL_BLOCK_TX_THRESHOLD`] transactions, uses
|
||||
/// sequential iteration to avoid rayon overhead.
|
||||
#[expect(clippy::type_complexity)]
|
||||
fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
|
||||
&self,
|
||||
transactions: I,
|
||||
transaction_count: usize,
|
||||
) -> (
|
||||
mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Recovered>>,
|
||||
mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Recovered>, I::Error>>,
|
||||
@@ -378,22 +388,51 @@ where
|
||||
let (prewarm_tx, prewarm_rx) = mpsc::channel();
|
||||
let (execute_tx, execute_rx) = mpsc::channel();
|
||||
|
||||
// Spawn a task that `convert`s all transactions in parallel and sends them out-of-order.
|
||||
rayon::spawn(move || {
|
||||
let (transactions, convert) = transactions.into();
|
||||
transactions.into_par_iter().enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
|
||||
let tx = convert(tx);
|
||||
let tx = tx.map(|tx| {
|
||||
let (tx_env, tx) = tx.into_parts();
|
||||
WithTxEnv { tx_env, tx: Arc::new(tx) }
|
||||
});
|
||||
// Only send Ok(_) variants to prewarming task.
|
||||
if let Ok(tx) = &tx {
|
||||
let _ = prewarm_tx.send(tx.clone());
|
||||
if transaction_count == 0 {
|
||||
// Empty block — nothing to do.
|
||||
} else if transaction_count < Self::SMALL_BLOCK_TX_THRESHOLD {
|
||||
// Sequential path for small blocks — avoids rayon work-stealing setup and
|
||||
// channel-based reorder overhead when it costs more than the ECDSA recovery itself.
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor",
|
||||
transaction_count,
|
||||
"using sequential sig recovery for small block"
|
||||
);
|
||||
self.executor.spawn_blocking(move || {
|
||||
let (transactions, convert) = transactions.into_parts();
|
||||
for (idx, tx) in transactions.into_iter().enumerate() {
|
||||
let tx = convert.convert(tx);
|
||||
let tx = tx.map(|tx| {
|
||||
let (tx_env, tx) = tx.into_parts();
|
||||
WithTxEnv { tx_env, tx: Arc::new(tx) }
|
||||
});
|
||||
if let Ok(tx) = &tx {
|
||||
let _ = prewarm_tx.send(tx.clone());
|
||||
}
|
||||
let _ = ooo_tx.send((idx, tx));
|
||||
}
|
||||
let _ = ooo_tx.send((idx, tx));
|
||||
});
|
||||
});
|
||||
} else {
|
||||
// Parallel path — spawn on rayon for parallel signature recovery.
|
||||
rayon::spawn(move || {
|
||||
let (transactions, convert) = transactions.into_parts();
|
||||
transactions.into_par_iter().enumerate().for_each_with(
|
||||
ooo_tx,
|
||||
|ooo_tx, (idx, tx)| {
|
||||
let tx = convert.convert(tx);
|
||||
let tx = tx.map(|tx| {
|
||||
let (tx_env, tx) = tx.into_parts();
|
||||
WithTxEnv { tx_env, tx: Arc::new(tx) }
|
||||
});
|
||||
// Only send Ok(_) variants to prewarming task.
|
||||
if let Ok(tx) = &tx {
|
||||
let _ = prewarm_tx.send(tx.clone());
|
||||
}
|
||||
let _ = ooo_tx.send((idx, tx));
|
||||
},
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
// Spawn a task that processes out-of-order transactions from the task above and sends them
|
||||
// to the execution task in order.
|
||||
@@ -405,8 +444,8 @@ where
|
||||
let _ = execute_tx.send(tx);
|
||||
next_for_execution += 1;
|
||||
|
||||
while let Some(entry) = queue.first_entry() &&
|
||||
*entry.key() == next_for_execution
|
||||
while let Some(entry) = queue.first_entry()
|
||||
&& *entry.key() == next_for_execution
|
||||
{
|
||||
let _ = execute_tx.send(entry.remove());
|
||||
next_for_execution += 1;
|
||||
@@ -424,7 +463,7 @@ where
|
||||
fn spawn_caching_with<P>(
|
||||
&self,
|
||||
env: ExecutionEnv<Evm>,
|
||||
mut transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
|
||||
transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
|
||||
provider_builder: StateProviderBuilder<N, P>,
|
||||
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
|
||||
bal: Option<Arc<BlockAccessList>>,
|
||||
@@ -433,11 +472,8 @@ where
|
||||
where
|
||||
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
|
||||
{
|
||||
if self.disable_transaction_prewarming {
|
||||
// if no transactions should be executed we clear them but still spawn the task for
|
||||
// caching updates
|
||||
transactions = mpsc::channel().1;
|
||||
}
|
||||
let skip_prewarm =
|
||||
self.disable_transaction_prewarming || env.transaction_count < SMALL_BLOCK_TX_THRESHOLD;
|
||||
|
||||
let saved_cache = self.disable_state_cache.not().then(|| self.cache_for(env.parent_hash));
|
||||
|
||||
@@ -466,7 +502,9 @@ where
|
||||
{
|
||||
let to_prewarm_task = to_prewarm_task.clone();
|
||||
self.executor.spawn_blocking(move || {
|
||||
let mode = if let Some(bal) = bal {
|
||||
let mode = if skip_prewarm {
|
||||
PrewarmMode::Skipped
|
||||
} else if let Some(bal) = bal {
|
||||
PrewarmMode::BlockAccessList(bal)
|
||||
} else {
|
||||
PrewarmMode::Transactions(transactions)
|
||||
@@ -515,6 +553,7 @@ where
|
||||
let disable_trie_cache = config.disable_trie_cache();
|
||||
let prune_depth = self.sparse_trie_prune_depth;
|
||||
let max_storage_tries = self.sparse_trie_max_storage_tries;
|
||||
let disable_cache_pruning = self.disable_sparse_trie_cache_pruning;
|
||||
let chunk_size =
|
||||
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size());
|
||||
let executor = self.executor.clone();
|
||||
@@ -611,6 +650,7 @@ where
|
||||
max_storage_tries,
|
||||
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
|
||||
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
|
||||
disable_cache_pruning,
|
||||
);
|
||||
trie_metrics
|
||||
.into_trie_for_reuse_duration_histogram
|
||||
@@ -724,6 +764,18 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
|
||||
.map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
|
||||
}
|
||||
|
||||
/// Takes the state root receiver out of the handle for use with custom waiting logic
|
||||
/// (e.g., timeout-based waiting).
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// If payload processing was started without background tasks.
|
||||
pub const fn take_state_root_rx(
|
||||
&mut self,
|
||||
) -> mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>> {
|
||||
self.state_root.take().expect("state_root is None")
|
||||
}
|
||||
|
||||
/// Returns a state hook to be used to send state updates to this task.
|
||||
///
|
||||
/// If a multiproof task is spawned the hook will notify it about new states.
|
||||
@@ -874,7 +926,7 @@ impl PayloadExecutionCache {
|
||||
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip(self))]
|
||||
pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
|
||||
let start = Instant::now();
|
||||
let cache = self.inner.read();
|
||||
let mut cache = self.inner.write();
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
self.metrics.execution_cache_wait_duration.record(elapsed.as_secs_f64());
|
||||
@@ -882,7 +934,7 @@ impl PayloadExecutionCache {
|
||||
warn!(blocked_for=?elapsed, "Blocked waiting for execution cache mutex");
|
||||
}
|
||||
|
||||
if let Some(c) = cache.as_ref() {
|
||||
if let Some(c) = cache.as_mut() {
|
||||
let cached_hash = c.executed_block_hash();
|
||||
// Check that the cache hash matches the parent hash of the current block. It won't
|
||||
// match in case it's a fork block.
|
||||
@@ -903,13 +955,13 @@ impl PayloadExecutionCache {
|
||||
);
|
||||
|
||||
if available {
|
||||
// If the has is available (no other threads are using it), but has a mismatching
|
||||
// parent hash, we can just clear it and keep using without re-creating from
|
||||
// scratch.
|
||||
if !hash_matches {
|
||||
c.clear();
|
||||
// Fork block: clear and update the hash on the ORIGINAL before cloning.
|
||||
// This prevents the canonical chain from matching on the stale hash
|
||||
// and picking up polluted data if the fork block fails.
|
||||
c.clear_with_hash(parent_hash);
|
||||
}
|
||||
return Some(c.clone())
|
||||
return Some(c.clone());
|
||||
} else if hash_matches {
|
||||
self.metrics.execution_cache_in_use.increment(1);
|
||||
}
|
||||
@@ -920,10 +972,25 @@ impl PayloadExecutionCache {
|
||||
None
|
||||
}
|
||||
|
||||
/// Clears the tracked cache
|
||||
#[expect(unused)]
|
||||
pub(crate) fn clear(&self) {
|
||||
self.inner.write().take();
|
||||
/// Waits until the execution cache becomes available for use.
|
||||
///
|
||||
/// This acquires a write lock to ensure exclusive access, then immediately releases it.
|
||||
/// This is useful for synchronization before starting payload processing.
|
||||
///
|
||||
/// Returns the time spent waiting for the lock.
|
||||
pub fn wait_for_availability(&self) -> Duration {
|
||||
let start = Instant::now();
|
||||
// Acquire write lock to wait for any current holders to finish
|
||||
let _guard = self.inner.write();
|
||||
let elapsed = start.elapsed();
|
||||
if elapsed.as_millis() > 5 {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor",
|
||||
blocked_for=?elapsed,
|
||||
"Waited for execution cache to become available"
|
||||
);
|
||||
}
|
||||
elapsed
|
||||
}
|
||||
|
||||
/// Updates the cache with a closure that has exclusive access to the guard.
|
||||
@@ -976,6 +1043,9 @@ pub struct ExecutionEnv<Evm: ConfigureEvm> {
|
||||
/// Used to determine parallel worker count for prewarming.
|
||||
/// A value of 0 indicates the count is unknown.
|
||||
pub transaction_count: usize,
|
||||
/// Withdrawals included in the block.
|
||||
/// Used to generate prefetch targets for withdrawal addresses.
|
||||
pub withdrawals: Option<Vec<Withdrawal>>,
|
||||
}
|
||||
|
||||
impl<Evm: ConfigureEvm> Default for ExecutionEnv<Evm>
|
||||
@@ -989,6 +1059,7 @@ where
|
||||
parent_hash: Default::default(),
|
||||
parent_state_root: Default::default(),
|
||||
transaction_count: 0,
|
||||
withdrawals: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -998,9 +1069,7 @@ mod tests {
|
||||
use super::PayloadExecutionCache;
|
||||
use crate::tree::{
|
||||
cached_state::{CachedStateMetrics, ExecutionCache, SavedCache},
|
||||
payload_processor::{
|
||||
evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
|
||||
},
|
||||
payload_processor::{evm_state_to_hashed_post_state, PayloadProcessor},
|
||||
precompile_cache::PrecompileCacheMap,
|
||||
StateProviderBuilder, TreeConfig,
|
||||
};
|
||||
@@ -1074,10 +1143,18 @@ mod tests {
|
||||
|
||||
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
|
||||
|
||||
// When the parent hash doesn't match, the cache is cleared and returned for reuse
|
||||
// When the parent hash doesn't match (fork block), the cache is cleared,
|
||||
// hash updated on the original, and clone returned for reuse
|
||||
let different_hash = B256::from([4u8; 32]);
|
||||
let cache = execution_cache.get_cache_for(different_hash);
|
||||
assert!(cache.is_some(), "cache should be returned for reuse after clearing")
|
||||
assert!(cache.is_some(), "cache should be returned for reuse after clearing");
|
||||
|
||||
drop(cache);
|
||||
|
||||
// The stored cache now has the fork block's parent hash.
|
||||
// Canonical chain looking for original hash sees a mismatch → clears and reuses.
|
||||
let original = execution_cache.get_cache_for(hash);
|
||||
assert!(original.is_some(), "canonical chain gets cache back via mismatch+clear");
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1102,7 +1179,7 @@ mod tests {
|
||||
#[test]
|
||||
fn on_inserted_executed_block_populates_cache() {
|
||||
let payload_processor = PayloadProcessor::new(
|
||||
WorkloadExecutor::default(),
|
||||
reth_tasks::Runtime::test(),
|
||||
EthEvmConfig::new(Arc::new(ChainSpec::default())),
|
||||
&TreeConfig::default(),
|
||||
PrecompileCacheMap::default(),
|
||||
@@ -1131,7 +1208,7 @@ mod tests {
|
||||
#[test]
|
||||
fn on_inserted_executed_block_skips_on_parent_mismatch() {
|
||||
let payload_processor = PayloadProcessor::new(
|
||||
WorkloadExecutor::default(),
|
||||
reth_tasks::Runtime::test(),
|
||||
EthEvmConfig::new(Arc::new(ChainSpec::default())),
|
||||
&TreeConfig::default(),
|
||||
PrecompileCacheMap::default(),
|
||||
@@ -1266,7 +1343,7 @@ mod tests {
|
||||
}
|
||||
|
||||
let mut payload_processor = PayloadProcessor::new(
|
||||
WorkloadExecutor::default(),
|
||||
reth_tasks::Runtime::test(),
|
||||
EthEvmConfig::new(factory.chain_spec()),
|
||||
&TreeConfig::default(),
|
||||
PrecompileCacheMap::default(),
|
||||
@@ -1301,4 +1378,61 @@ mod tests {
|
||||
"State root mismatch: task={root_from_task}, base={root_from_regular}"
|
||||
);
|
||||
}
|
||||
|
||||
/// Tests the full prewarm lifecycle for a fork block:
|
||||
///
|
||||
/// 1. Cache is at canonical block 4.
|
||||
/// 2. Fork block (parent = block 2) checks out the cache via `get_cache_for`, simulating what
|
||||
/// `PrewarmCacheTask` does when it receives a `SavedCache`.
|
||||
/// 3. Prewarm populates the shared cache with fork-specific state.
|
||||
/// 4. While the prewarm clone is alive, the cache is unavailable (`usage_guard` > 1).
|
||||
/// 5. Prewarm drops without calling `save_cache` (fork block was invalid).
|
||||
/// 6. Canonical block 5 (parent = block 4) must get a cache with correct hash and no stale fork
|
||||
/// data.
|
||||
#[test]
|
||||
fn fork_prewarm_dropped_without_save_does_not_corrupt_cache() {
|
||||
let execution_cache = PayloadExecutionCache::default();
|
||||
|
||||
// Canonical chain at block 4.
|
||||
let block4_hash = B256::from([4u8; 32]);
|
||||
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(block4_hash)));
|
||||
|
||||
// Fork block arrives with parent = block 2. Prewarm task checks out the cache.
|
||||
// This simulates PrewarmCacheTask receiving a SavedCache clone from get_cache_for.
|
||||
let fork_parent = B256::from([2u8; 32]);
|
||||
let prewarm_cache = execution_cache.get_cache_for(fork_parent);
|
||||
assert!(prewarm_cache.is_some(), "prewarm should obtain cache for fork block");
|
||||
let prewarm_cache = prewarm_cache.unwrap();
|
||||
assert_eq!(prewarm_cache.executed_block_hash(), fork_parent);
|
||||
|
||||
// Prewarm populates cache with fork-specific state (ancestor data for block 2).
|
||||
// Since ExecutionCache uses Arc<Inner>, this data is shared with the stored original.
|
||||
let fork_addr = Address::from([0xBB; 20]);
|
||||
let fork_key = B256::from([0xCC; 32]);
|
||||
prewarm_cache.cache().insert_storage(fork_addr, fork_key, Some(U256::from(999)));
|
||||
|
||||
// While prewarm holds the clone, the usage_guard count > 1 → cache is in use.
|
||||
let during_prewarm = execution_cache.get_cache_for(block4_hash);
|
||||
assert!(
|
||||
during_prewarm.is_none(),
|
||||
"cache must be unavailable while prewarm holds a reference"
|
||||
);
|
||||
|
||||
// Fork block fails — prewarm task drops without calling save_cache/update_with_guard.
|
||||
drop(prewarm_cache);
|
||||
|
||||
// Canonical block 5 arrives (parent = block 4).
|
||||
// Stored hash = fork_parent (our fix), so get_cache_for sees a mismatch,
|
||||
// clears the stale fork data, and returns a cache with hash = block4_hash.
|
||||
let block5_cache = execution_cache.get_cache_for(block4_hash);
|
||||
assert!(
|
||||
block5_cache.is_some(),
|
||||
"canonical chain must get cache after fork prewarm is dropped"
|
||||
);
|
||||
assert_eq!(
|
||||
block5_cache.as_ref().unwrap().executed_block_hash(),
|
||||
block4_hash,
|
||||
"cache must carry the canonical parent hash, not the fork parent"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -115,6 +115,8 @@ pub enum MultiProofMessage {
|
||||
/// The state update that was used to calculate the proof
|
||||
state: HashedPostState,
|
||||
},
|
||||
/// Pre-hashed state update from BAL conversion that can be applied directly without proofs.
|
||||
HashedStateUpdate(HashedPostState),
|
||||
/// Block Access List (EIP-7928; BAL) containing complete state changes for the block.
|
||||
///
|
||||
/// When received, the task generates a single state update from the BAL and processes it.
|
||||
@@ -1189,6 +1191,11 @@ impl MultiProofTask {
|
||||
}
|
||||
false
|
||||
}
|
||||
MultiProofMessage::HashedStateUpdate(hashed_state) => {
|
||||
batch_metrics.state_update_proofs_requested +=
|
||||
self.on_hashed_state_update(Source::BlockAccessList, hashed_state);
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1534,23 +1541,18 @@ mod tests {
|
||||
providers::OverlayStateProviderFactory, test_utils::create_test_provider_factory,
|
||||
BlockNumReader, BlockReader, ChangeSetReader, DatabaseProviderFactory, LatestStateProvider,
|
||||
PruneCheckpointReader, StageCheckpointReader, StateProviderBox, StorageChangeSetReader,
|
||||
StorageSettingsCache,
|
||||
};
|
||||
use reth_trie::MultiProof;
|
||||
use reth_trie_db::ChangesetCache;
|
||||
use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofWorkerHandle};
|
||||
use revm_primitives::{B256, U256};
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use tokio::runtime::{Handle, Runtime};
|
||||
|
||||
/// Get a handle to the test runtime, creating it if necessary
|
||||
fn get_test_runtime_handle() -> Handle {
|
||||
static TEST_RT: OnceLock<Runtime> = OnceLock::new();
|
||||
TEST_RT
|
||||
.get_or_init(|| {
|
||||
tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap()
|
||||
})
|
||||
.handle()
|
||||
.clone()
|
||||
/// Get a test runtime, creating it if necessary
|
||||
fn get_test_runtime() -> &'static reth_tasks::Runtime {
|
||||
static TEST_RT: OnceLock<reth_tasks::Runtime> = OnceLock::new();
|
||||
TEST_RT.get_or_init(reth_tasks::Runtime::test)
|
||||
}
|
||||
|
||||
fn create_test_state_root_task<F>(factory: F) -> MultiProofTask
|
||||
@@ -1561,16 +1563,17 @@ mod tests {
|
||||
+ PruneCheckpointReader
|
||||
+ ChangeSetReader
|
||||
+ StorageChangeSetReader
|
||||
+ StorageSettingsCache
|
||||
+ BlockNumReader,
|
||||
> + Clone
|
||||
+ Send
|
||||
+ 'static,
|
||||
{
|
||||
let rt_handle = get_test_runtime_handle();
|
||||
let runtime = get_test_runtime();
|
||||
let changeset_cache = ChangesetCache::new();
|
||||
let overlay_factory = OverlayStateProviderFactory::new(factory, changeset_cache);
|
||||
let task_ctx = ProofTaskCtx::new(overlay_factory);
|
||||
let proof_handle = ProofWorkerHandle::new(rt_handle, task_ctx, 1, 1, false);
|
||||
let proof_handle = ProofWorkerHandle::new(runtime, task_ctx, false);
|
||||
let (to_sparse_trie, _receiver) = std::sync::mpsc::channel();
|
||||
let (tx, rx) = crossbeam_channel::unbounded();
|
||||
|
||||
@@ -1580,7 +1583,10 @@ mod tests {
|
||||
fn create_cached_provider<F>(factory: F) -> CachedStateProvider<StateProviderBox>
|
||||
where
|
||||
F: DatabaseProviderFactory<
|
||||
Provider: BlockReader + StageCheckpointReader + PruneCheckpointReader,
|
||||
Provider: BlockReader
|
||||
+ StageCheckpointReader
|
||||
+ PruneCheckpointReader
|
||||
+ reth_provider::StorageSettingsCache,
|
||||
> + Clone
|
||||
+ Send
|
||||
+ 'static,
|
||||
|
||||
@@ -3,12 +3,11 @@
|
||||
use alloy_primitives::B256;
|
||||
use parking_lot::Mutex;
|
||||
use reth_trie_sparse::SparseStateTrie;
|
||||
use reth_trie_sparse_parallel::ParallelSparseTrie;
|
||||
use std::sync::Arc;
|
||||
use tracing::debug;
|
||||
|
||||
/// Type alias for the sparse trie type used in preservation.
|
||||
pub(super) type SparseTrie = SparseStateTrie<ParallelSparseTrie, ParallelSparseTrie>;
|
||||
pub(super) type SparseTrie = SparseStateTrie;
|
||||
|
||||
/// Shared handle to a preserved sparse trie that can be reused across payload validations.
|
||||
///
|
||||
|
||||
@@ -14,8 +14,7 @@
|
||||
use crate::tree::{
|
||||
cached_state::{CachedStateProvider, SavedCache},
|
||||
payload_processor::{
|
||||
bal::{total_slots, BALSlotIter},
|
||||
executor::WorkloadExecutor,
|
||||
bal::{self, total_slots, BALSlotIter},
|
||||
multiproof::{MultiProofMessage, VersionedMultiProofTargets},
|
||||
PayloadExecutionCache,
|
||||
},
|
||||
@@ -24,6 +23,7 @@ use crate::tree::{
|
||||
};
|
||||
use alloy_consensus::transaction::TxHashRef;
|
||||
use alloy_eip7928::BlockAccessList;
|
||||
use alloy_eips::eip4895::Withdrawal;
|
||||
use alloy_evm::Database;
|
||||
use alloy_primitives::{keccak256, map::B256Set, B256};
|
||||
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
|
||||
@@ -36,6 +36,7 @@ use reth_provider::{
|
||||
StateReader,
|
||||
};
|
||||
use reth_revm::{database::StateProviderDatabase, state::EvmState};
|
||||
use reth_tasks::Runtime;
|
||||
use reth_trie::MultiProofTargets;
|
||||
use std::{
|
||||
ops::Range,
|
||||
@@ -48,13 +49,16 @@ use std::{
|
||||
};
|
||||
use tracing::{debug, debug_span, instrument, trace, warn, Span};
|
||||
|
||||
/// Determines the prewarming mode: transaction-based or BAL-based.
|
||||
/// Determines the prewarming mode: transaction-based, BAL-based, or skipped.
|
||||
#[derive(Debug)]
|
||||
pub enum PrewarmMode<Tx> {
|
||||
/// Prewarm by executing transactions from a stream.
|
||||
Transactions(Receiver<Tx>),
|
||||
/// Prewarm by prefetching slots from a Block Access List.
|
||||
BlockAccessList(Arc<BlockAccessList>),
|
||||
/// Transaction prewarming is skipped (e.g. small blocks where the overhead exceeds the
|
||||
/// benefit). No workers are spawned.
|
||||
Skipped,
|
||||
}
|
||||
|
||||
/// A wrapper for transactions that includes their index in the block.
|
||||
@@ -77,7 +81,7 @@ where
|
||||
Evm: ConfigureEvm<Primitives = N>,
|
||||
{
|
||||
/// The executor used to spawn execution tasks.
|
||||
executor: WorkloadExecutor,
|
||||
executor: Runtime,
|
||||
/// Shared execution cache.
|
||||
execution_cache: PayloadExecutionCache,
|
||||
/// Context provided to execution tasks
|
||||
@@ -100,7 +104,7 @@ where
|
||||
{
|
||||
/// Initializes the task with the given transactions pending execution
|
||||
pub fn new(
|
||||
executor: WorkloadExecutor,
|
||||
executor: Runtime,
|
||||
execution_cache: PayloadExecutionCache,
|
||||
ctx: PrewarmContext<N, P, Evm>,
|
||||
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
|
||||
@@ -163,7 +167,7 @@ where
|
||||
};
|
||||
|
||||
// Spawn workers
|
||||
let tx_sender = ctx.clone().spawn_workers(workers_needed, &executor, to_multi_proof, done_tx.clone());
|
||||
let tx_sender = ctx.clone().spawn_workers(workers_needed, &executor, to_multi_proof.clone(), done_tx.clone());
|
||||
|
||||
// Distribute transactions to workers
|
||||
let mut tx_index = 0usize;
|
||||
@@ -187,6 +191,16 @@ where
|
||||
tx_index += 1;
|
||||
}
|
||||
|
||||
// Send withdrawal prefetch targets after all transactions have been distributed
|
||||
if let Some(to_multi_proof) = to_multi_proof
|
||||
&& let Some(withdrawals) = &ctx.env.withdrawals
|
||||
&& !withdrawals.is_empty()
|
||||
{
|
||||
let targets =
|
||||
multiproof_targets_from_withdrawals(withdrawals, ctx.v2_proofs_enabled);
|
||||
let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
|
||||
}
|
||||
|
||||
// drop sender and wait for all tasks to finish
|
||||
drop(done_tx);
|
||||
drop(tx_sender);
|
||||
@@ -276,6 +290,7 @@ where
|
||||
target: "engine::tree::payload_processor::prewarm",
|
||||
"Skipping BAL prewarm - no cache available"
|
||||
);
|
||||
self.send_bal_hashed_state(&bal);
|
||||
let _ =
|
||||
actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
|
||||
return;
|
||||
@@ -291,7 +306,7 @@ where
|
||||
);
|
||||
|
||||
if total_slots == 0 {
|
||||
// No slots to prefetch, signal completion immediately
|
||||
self.send_bal_hashed_state(&bal);
|
||||
let _ =
|
||||
actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
|
||||
return;
|
||||
@@ -336,10 +351,51 @@ where
|
||||
"All BAL prewarm workers completed"
|
||||
);
|
||||
|
||||
// Convert BAL to HashedPostState and send to multiproof task
|
||||
self.send_bal_hashed_state(&bal);
|
||||
|
||||
// Signal that execution has finished
|
||||
let _ = actions_tx.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
|
||||
}
|
||||
|
||||
/// Converts the BAL to [`HashedPostState`](reth_trie::HashedPostState) and sends it to the
|
||||
/// multiproof task.
|
||||
fn send_bal_hashed_state(&self, bal: &BlockAccessList) {
|
||||
let Some(to_multi_proof) = &self.to_multi_proof else { return };
|
||||
|
||||
let provider = match self.ctx.provider.build() {
|
||||
Ok(provider) => provider,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
target: "engine::tree::payload_processor::prewarm",
|
||||
?err,
|
||||
"Failed to build provider for BAL hashed state conversion"
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match bal::bal_to_hashed_post_state(bal, &provider) {
|
||||
Ok(hashed_state) => {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor::prewarm",
|
||||
accounts = hashed_state.accounts.len(),
|
||||
storages = hashed_state.storages.len(),
|
||||
"Converted BAL to hashed post state"
|
||||
);
|
||||
let _ = to_multi_proof.send(MultiProofMessage::HashedStateUpdate(hashed_state));
|
||||
let _ = to_multi_proof.send(MultiProofMessage::FinishedStateUpdates);
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
target: "engine::tree::payload_processor::prewarm",
|
||||
?err,
|
||||
"Failed to convert BAL to hashed state"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Executes the task.
|
||||
///
|
||||
/// This will execute the transactions until all transactions have been processed or the task
|
||||
@@ -363,6 +419,10 @@ where
|
||||
PrewarmMode::BlockAccessList(bal) => {
|
||||
self.run_bal_prewarm(bal, actions_tx);
|
||||
}
|
||||
PrewarmMode::Skipped => {
|
||||
let _ = actions_tx
|
||||
.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: 0 });
|
||||
}
|
||||
}
|
||||
|
||||
let mut final_execution_outcome = None;
|
||||
@@ -475,11 +535,8 @@ where
|
||||
if let Some(saved_cache) = saved_cache {
|
||||
let caches = saved_cache.cache().clone();
|
||||
let cache_metrics = saved_cache.metrics().clone();
|
||||
state_provider = Box::new(
|
||||
CachedStateProvider::new(state_provider, caches, cache_metrics)
|
||||
// ensure we pre-warm the cache
|
||||
.prewarm(),
|
||||
);
|
||||
state_provider =
|
||||
Box::new(CachedStateProvider::new_prewarm(state_provider, caches, cache_metrics));
|
||||
}
|
||||
|
||||
let state_provider = StateProviderDatabase::new(state_provider);
|
||||
@@ -537,18 +594,11 @@ where
|
||||
return
|
||||
};
|
||||
|
||||
while let Ok(IndexedTransaction { index, tx }) = {
|
||||
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", "recv tx")
|
||||
.entered();
|
||||
txs.recv()
|
||||
} {
|
||||
let enter = debug_span!(
|
||||
while let Ok(IndexedTransaction { index, tx }) = txs.recv() {
|
||||
let _enter = debug_span!(
|
||||
target: "engine::tree::payload_processor::prewarm",
|
||||
"prewarm tx",
|
||||
index,
|
||||
tx_hash = %tx.tx().tx_hash(),
|
||||
is_success = tracing::field::Empty,
|
||||
gas_used = tracing::field::Empty,
|
||||
)
|
||||
.entered();
|
||||
|
||||
@@ -579,12 +629,6 @@ where
|
||||
};
|
||||
metrics.execution_duration.record(start.elapsed());
|
||||
|
||||
// record some basic information about the transactions
|
||||
enter.record("gas_used", res.result.gas_used());
|
||||
enter.record("is_success", res.result.is_success());
|
||||
|
||||
drop(enter);
|
||||
|
||||
// If the task was cancelled, stop execution, and exit.
|
||||
if terminate_execution.load(Ordering::Relaxed) {
|
||||
break
|
||||
@@ -593,16 +637,12 @@ where
|
||||
// Only send outcome for transactions after the first txn
|
||||
// as the main execution will be just as fast
|
||||
if index > 0 {
|
||||
let _enter =
|
||||
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm outcome", index, tx_hash=%tx.tx().tx_hash())
|
||||
.entered();
|
||||
let (targets, storage_targets) =
|
||||
multiproof_targets_from_state(res.state, v2_proofs_enabled);
|
||||
metrics.prefetch_storage_targets.record(storage_targets as f64);
|
||||
if let Some(to_multi_proof) = &to_multi_proof {
|
||||
let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
|
||||
}
|
||||
drop(_enter);
|
||||
}
|
||||
|
||||
metrics.total_runtime.record(start.elapsed());
|
||||
@@ -618,7 +658,7 @@ where
|
||||
fn spawn_workers<Tx>(
|
||||
self,
|
||||
workers_needed: usize,
|
||||
task_executor: &WorkloadExecutor,
|
||||
task_executor: &Runtime,
|
||||
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
|
||||
done_tx: Sender<()>,
|
||||
) -> CrossbeamSender<IndexedTransaction<Tx>>
|
||||
@@ -655,7 +695,7 @@ where
|
||||
fn spawn_bal_worker(
|
||||
&self,
|
||||
idx: usize,
|
||||
executor: &WorkloadExecutor,
|
||||
executor: &Runtime,
|
||||
bal: Arc<BlockAccessList>,
|
||||
range: Range<usize>,
|
||||
done_tx: Sender<()>,
|
||||
@@ -831,6 +871,27 @@ fn multiproof_targets_v2_from_state(state: EvmState) -> (VersionedMultiProofTarg
|
||||
(VersionedMultiProofTargets::V2(targets), storage_target_count)
|
||||
}
|
||||
|
||||
/// Returns [`VersionedMultiProofTargets`] for withdrawal addresses.
|
||||
///
|
||||
/// Withdrawals only modify account balances (no storage), so the targets contain
|
||||
/// only account-level entries with empty storage sets.
|
||||
fn multiproof_targets_from_withdrawals(
|
||||
withdrawals: &[Withdrawal],
|
||||
v2_enabled: bool,
|
||||
) -> VersionedMultiProofTargets {
|
||||
use reth_trie_parallel::targets_v2::MultiProofTargetsV2;
|
||||
if v2_enabled {
|
||||
VersionedMultiProofTargets::V2(MultiProofTargetsV2 {
|
||||
account_targets: withdrawals.iter().map(|w| keccak256(w.address).into()).collect(),
|
||||
..Default::default()
|
||||
})
|
||||
} else {
|
||||
VersionedMultiProofTargets::Legacy(
|
||||
withdrawals.iter().map(|w| (keccak256(w.address), Default::default())).collect(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// The events the pre-warm task can handle.
|
||||
///
|
||||
/// Generic over `R` (receipt type) to allow sharing `Arc<ExecutionOutcome<R>>` with the main
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
//! Sparse Trie task related functionality.
|
||||
|
||||
use super::executor::WorkloadExecutor;
|
||||
use crate::tree::{
|
||||
multiproof::{
|
||||
dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage,
|
||||
@@ -11,8 +10,9 @@ use crate::tree::{
|
||||
use alloy_primitives::B256;
|
||||
use alloy_rlp::{Decodable, Encodable};
|
||||
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
|
||||
use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator};
|
||||
use rayon::iter::ParallelIterator;
|
||||
use reth_primitives_traits::{Account, ParallelBridgeBuffered};
|
||||
use reth_tasks::Runtime;
|
||||
use reth_trie::{
|
||||
proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, Nibbles,
|
||||
TrieAccount, EMPTY_ROOT_HASH, TRIE_ACCOUNT_RLP_MAX_SIZE,
|
||||
@@ -28,7 +28,7 @@ use reth_trie_parallel::{
|
||||
use reth_trie_sparse::{
|
||||
errors::{SparseStateTrieResult, SparseTrieErrorKind, SparseTrieResult},
|
||||
provider::{TrieNodeProvider, TrieNodeProviderFactory},
|
||||
DeferredDrops, LeafUpdate, SerialSparseTrie, SparseStateTrie, SparseTrie, SparseTrieExt,
|
||||
DeferredDrops, LeafUpdate, ParallelSparseTrie, SparseStateTrie, SparseTrie,
|
||||
};
|
||||
use revm_primitives::{hash_map::Entry, B256Map};
|
||||
use smallvec::SmallVec;
|
||||
@@ -44,8 +44,8 @@ where
|
||||
BPF: TrieNodeProviderFactory + Send + Sync,
|
||||
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
A: SparseTrie + SparseTrieExt + Send + Sync + Default,
|
||||
S: SparseTrie + SparseTrieExt + Send + Sync + Default + Clone,
|
||||
A: SparseTrie + Send + Sync + Default,
|
||||
S: SparseTrie + Send + Sync + Default + Clone,
|
||||
{
|
||||
Cleared(SparseTrieTask<BPF, A, S>),
|
||||
Cached(SparseTrieCacheTask<A, S>),
|
||||
@@ -56,8 +56,8 @@ where
|
||||
BPF: TrieNodeProviderFactory + Send + Sync + Clone,
|
||||
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
A: SparseTrie + SparseTrieExt + Send + Sync + Default,
|
||||
S: SparseTrie + SparseTrieExt + Send + Sync + Default + Clone,
|
||||
A: SparseTrie + Send + Sync + Default,
|
||||
S: SparseTrie + Send + Sync + Default + Clone,
|
||||
{
|
||||
pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
|
||||
match self {
|
||||
@@ -72,6 +72,7 @@ where
|
||||
max_storage_tries: usize,
|
||||
max_nodes_capacity: usize,
|
||||
max_values_capacity: usize,
|
||||
disable_pruning: bool,
|
||||
) -> (SparseStateTrie<A, S>, DeferredDrops) {
|
||||
match self {
|
||||
Self::Cleared(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
|
||||
@@ -80,6 +81,7 @@ where
|
||||
max_storage_tries,
|
||||
max_nodes_capacity,
|
||||
max_values_capacity,
|
||||
disable_pruning,
|
||||
),
|
||||
}
|
||||
}
|
||||
@@ -97,7 +99,7 @@ where
|
||||
}
|
||||
|
||||
/// A task responsible for populating the sparse trie.
|
||||
pub(super) struct SparseTrieTask<BPF, A = SerialSparseTrie, S = SerialSparseTrie>
|
||||
pub(super) struct SparseTrieTask<BPF, A = ParallelSparseTrie, S = ParallelSparseTrie>
|
||||
where
|
||||
BPF: TrieNodeProviderFactory + Send + Sync,
|
||||
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
@@ -117,8 +119,8 @@ where
|
||||
BPF: TrieNodeProviderFactory + Send + Sync + Clone,
|
||||
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
|
||||
A: SparseTrie + SparseTrieExt + Send + Sync + Default,
|
||||
S: SparseTrie + SparseTrieExt + Send + Sync + Default + Clone,
|
||||
A: SparseTrie + Send + Sync + Default,
|
||||
S: SparseTrie + Send + Sync + Default + Clone,
|
||||
{
|
||||
/// Creates a new sparse trie task with the given trie.
|
||||
pub(super) const fn new(
|
||||
@@ -212,7 +214,7 @@ where
|
||||
const MAX_PENDING_UPDATES: usize = 100;
|
||||
|
||||
/// Sparse trie task implementation that uses in-memory sparse trie data to schedule proof fetching.
|
||||
pub(super) struct SparseTrieCacheTask<A = SerialSparseTrie, S = SerialSparseTrie> {
|
||||
pub(super) struct SparseTrieCacheTask<A = ParallelSparseTrie, S = ParallelSparseTrie> {
|
||||
/// Sender for proof results.
|
||||
proof_result_tx: CrossbeamSender<ProofResultMessage>,
|
||||
/// Receiver for proof results directly from workers.
|
||||
@@ -277,12 +279,12 @@ pub(super) struct SparseTrieCacheTask<A = SerialSparseTrie, S = SerialSparseTrie
|
||||
|
||||
impl<A, S> SparseTrieCacheTask<A, S>
|
||||
where
|
||||
A: SparseTrieExt + Default,
|
||||
S: SparseTrieExt + Default + Clone,
|
||||
A: SparseTrie + Default,
|
||||
S: SparseTrie + Default + Clone,
|
||||
{
|
||||
/// Creates a new sparse trie, pre-populating with an existing [`SparseStateTrie`].
|
||||
pub(super) fn new_with_trie(
|
||||
executor: &WorkloadExecutor,
|
||||
executor: &Runtime,
|
||||
updates: CrossbeamReceiver<MultiProofMessage>,
|
||||
proof_worker_handle: ProofWorkerHandle,
|
||||
metrics: MultiProofTaskMetrics,
|
||||
@@ -343,6 +345,9 @@ where
|
||||
MultiProofMessage::EmptyProof { .. } | MultiProofMessage::BlockAccessList(_) => {
|
||||
continue
|
||||
}
|
||||
MultiProofMessage::HashedStateUpdate(state) => {
|
||||
SparseTrieTaskMessage::HashedState(state)
|
||||
}
|
||||
};
|
||||
if hashed_state_tx.send(msg).is_err() {
|
||||
break;
|
||||
@@ -353,16 +358,23 @@ where
|
||||
/// Prunes and shrinks the trie for reuse in the next payload built on top of this one.
|
||||
///
|
||||
/// Should be called after the state root result has been sent.
|
||||
///
|
||||
/// When `disable_pruning` is true, the trie is preserved without any node pruning,
|
||||
/// storage trie eviction, or capacity shrinking, keeping the full cache intact for
|
||||
/// benchmarking purposes.
|
||||
pub(super) fn into_trie_for_reuse(
|
||||
self,
|
||||
prune_depth: usize,
|
||||
max_storage_tries: usize,
|
||||
max_nodes_capacity: usize,
|
||||
max_values_capacity: usize,
|
||||
disable_pruning: bool,
|
||||
) -> (SparseStateTrie<A, S>, DeferredDrops) {
|
||||
let Self { mut trie, .. } = self;
|
||||
trie.prune(prune_depth, max_storage_tries);
|
||||
trie.shrink_to(max_nodes_capacity, max_values_capacity);
|
||||
if !disable_pruning {
|
||||
trie.prune(prune_depth, max_storage_tries);
|
||||
trie.shrink_to(max_nodes_capacity, max_values_capacity);
|
||||
}
|
||||
let deferred = trie.take_deferred_drops();
|
||||
(trie, deferred)
|
||||
}
|
||||
@@ -404,7 +416,9 @@ where
|
||||
let update = match message {
|
||||
Ok(m) => m,
|
||||
Err(_) => {
|
||||
break
|
||||
return Err(ParallelStateRootError::Other(
|
||||
"updates channel disconnected before state root calculation".to_string(),
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
@@ -483,7 +497,7 @@ where
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
level = "trace",
|
||||
target = "engine::tree::payload_processor::sparse_trie",
|
||||
skip_all
|
||||
)]
|
||||
@@ -515,7 +529,7 @@ where
|
||||
|
||||
/// Processes a hashed state update and encodes all state changes as trie updates.
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
level = "trace",
|
||||
target = "engine::tree::payload_processor::sparse_trie",
|
||||
skip_all
|
||||
)]
|
||||
@@ -675,6 +689,11 @@ where
|
||||
/// Invokes `update_leaves` for the accounts trie and collects any new targets.
|
||||
///
|
||||
/// Returns whether any updates were drained (applied to the trie).
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_processor::sparse_trie",
|
||||
skip_all
|
||||
)]
|
||||
fn process_account_leaf_updates(&mut self, new: bool) -> SparseTrieResult<bool> {
|
||||
let account_updates =
|
||||
if new { &mut self.new_account_updates } else { &mut self.account_updates };
|
||||
@@ -718,53 +737,50 @@ where
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let span = tracing::Span::current();
|
||||
let roots = self
|
||||
let span = debug_span!("compute_storage_roots").entered();
|
||||
self
|
||||
.trie
|
||||
.storage_tries_mut()
|
||||
.par_iter_mut()
|
||||
.filter(|(address, _)| {
|
||||
self.storage_updates.get(*address).is_some_and(|updates| updates.is_empty())
|
||||
.iter_mut()
|
||||
.filter(|(address, trie)| {
|
||||
self.storage_updates.get(*address).is_some_and(|updates| updates.is_empty()) &&
|
||||
!trie.is_root_cached()
|
||||
})
|
||||
.map(|(address, trie)| {
|
||||
.par_bridge_buffered()
|
||||
.for_each(|(address, trie)| {
|
||||
let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage root", ?address).entered();
|
||||
let root =
|
||||
trie.root().expect("updates are drained, trie should be revealed by now");
|
||||
|
||||
(address, root)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for (addr, storage_root) in roots {
|
||||
// If the storage root is known and we have a pending update for this account, encode it
|
||||
// into a proper update.
|
||||
if let Entry::Occupied(entry) = self.pending_account_updates.entry(*addr) &&
|
||||
entry.get().is_some()
|
||||
{
|
||||
let account = entry.remove().expect("just checked, should be Some");
|
||||
let encoded = if account.is_none_or(|account| account.is_empty()) &&
|
||||
storage_root == EMPTY_ROOT_HASH
|
||||
{
|
||||
Vec::new()
|
||||
} else {
|
||||
self.account_rlp_buf.clear();
|
||||
account
|
||||
.unwrap_or_default()
|
||||
.into_trie_account(storage_root)
|
||||
.encode(&mut self.account_rlp_buf);
|
||||
self.account_rlp_buf.clone()
|
||||
};
|
||||
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
|
||||
}
|
||||
}
|
||||
trie.root().expect("updates are drained, trie should be revealed by now");
|
||||
});
|
||||
drop(span);
|
||||
|
||||
loop {
|
||||
let span = debug_span!("promote_updates", promoted = tracing::field::Empty).entered();
|
||||
// Now handle pending account updates that can be upgraded to a proper update.
|
||||
let account_rlp_buf = &mut self.account_rlp_buf;
|
||||
let mut num_promoted = 0;
|
||||
self.pending_account_updates.retain(|addr, account| {
|
||||
// If account has pending storage updates, it is still pending.
|
||||
if self.storage_updates.get(addr).is_some_and(|updates| !updates.is_empty()) {
|
||||
return true;
|
||||
if let Some(updates) = self.storage_updates.get(addr) {
|
||||
if !updates.is_empty() {
|
||||
// If account has pending storage updates, it is still pending.
|
||||
return true;
|
||||
} else if let Some(account) = account.take() {
|
||||
let storage_root = self.trie.storage_root(addr).expect("updates are drained, storage trie should be revealed by now");
|
||||
let encoded = if account.is_none_or(|account| account.is_empty()) &&
|
||||
storage_root == EMPTY_ROOT_HASH
|
||||
{
|
||||
Vec::new()
|
||||
} else {
|
||||
account_rlp_buf.clear();
|
||||
account
|
||||
.unwrap_or_default()
|
||||
.into_trie_account(storage_root)
|
||||
.encode(account_rlp_buf);
|
||||
account_rlp_buf.clone()
|
||||
};
|
||||
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
|
||||
num_promoted += 1;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Get the current account state either from the trie or from latest account update.
|
||||
@@ -799,15 +815,18 @@ where
|
||||
account_rlp_buf.clone()
|
||||
};
|
||||
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
|
||||
num_promoted += 1;
|
||||
|
||||
false
|
||||
});
|
||||
span.record("promoted", num_promoted);
|
||||
drop(span);
|
||||
|
||||
// Only exit when no new updates are processed.
|
||||
//
|
||||
// We need to keep iterating if any updates are being drained because that might
|
||||
// indicate that more pending account updates can be promoted.
|
||||
if !self.process_account_leaf_updates(false)? {
|
||||
if num_promoted == 0 || !self.process_account_leaf_updates(false)? {
|
||||
break
|
||||
}
|
||||
}
|
||||
@@ -1024,3 +1043,59 @@ where
|
||||
|
||||
Ok(elapsed)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use alloy_primitives::{keccak256, Address, U256};
|
||||
use reth_trie_sparse::ParallelSparseTrie;
|
||||
|
||||
#[test]
|
||||
fn test_run_hashing_task_hashed_state_update_forwards() {
|
||||
let (updates_tx, updates_rx) = crossbeam_channel::unbounded();
|
||||
let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
|
||||
|
||||
let address = keccak256(Address::random());
|
||||
let slot = keccak256(U256::from(42).to_be_bytes::<32>());
|
||||
let value = U256::from(999);
|
||||
|
||||
let mut hashed_state = HashedPostState::default();
|
||||
hashed_state.accounts.insert(
|
||||
address,
|
||||
Some(Account { balance: U256::from(100), nonce: 1, bytecode_hash: None }),
|
||||
);
|
||||
let mut storage = reth_trie::HashedStorage::new(false);
|
||||
storage.storage.insert(slot, value);
|
||||
hashed_state.storages.insert(address, storage);
|
||||
|
||||
let expected_state = hashed_state.clone();
|
||||
|
||||
let handle = std::thread::spawn(move || {
|
||||
SparseTrieCacheTask::<ParallelSparseTrie, ParallelSparseTrie>::run_hashing_task(
|
||||
updates_rx,
|
||||
hashed_state_tx,
|
||||
);
|
||||
});
|
||||
|
||||
updates_tx.send(MultiProofMessage::HashedStateUpdate(hashed_state)).unwrap();
|
||||
updates_tx.send(MultiProofMessage::FinishedStateUpdates).unwrap();
|
||||
drop(updates_tx);
|
||||
|
||||
let SparseTrieTaskMessage::HashedState(received) = hashed_state_rx.recv().unwrap() else {
|
||||
panic!("expected HashedState message");
|
||||
};
|
||||
|
||||
let account = received.accounts.get(&address).unwrap().unwrap();
|
||||
assert_eq!(account.balance, expected_state.accounts[&address].unwrap().balance);
|
||||
assert_eq!(account.nonce, expected_state.accounts[&address].unwrap().nonce);
|
||||
|
||||
let storage = received.storages.get(&address).unwrap();
|
||||
assert_eq!(*storage.storage.get(&slot).unwrap(), value);
|
||||
|
||||
let second = hashed_state_rx.recv().unwrap();
|
||||
assert!(matches!(second, SparseTrieTaskMessage::FinishedStateUpdates));
|
||||
|
||||
assert!(hashed_state_rx.recv().is_err());
|
||||
handle.join().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user