mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
118 Commits
feat/proof
...
push
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d6324d63e2 | ||
|
|
5f3ade1bfe | ||
|
|
b053f6fafe | ||
|
|
2a58e7a077 | ||
|
|
793a3d5fb3 | ||
|
|
89ae1af694 | ||
|
|
9c33fb5d45 | ||
|
|
bef3d7b4d1 | ||
|
|
e918c17af9 | ||
|
|
fcc170d53c | ||
|
|
c685842ba2 | ||
|
|
564ffa5868 | ||
|
|
12891dd171 | ||
|
|
c1015022f5 | ||
|
|
e3fe6326bc | ||
|
|
e3d520b24f | ||
|
|
9f29939ea1 | ||
|
|
10881d1c73 | ||
|
|
408593467b | ||
|
|
8caf8cdf11 | ||
|
|
1e8030ef28 | ||
|
|
f72c503d6f | ||
|
|
42890e6e7f | ||
|
|
e30e441ada | ||
|
|
121160d248 | ||
|
|
7ff78ca082 | ||
|
|
d7f56d509c | ||
|
|
3300e404cf | ||
|
|
77cb99fc78 | ||
|
|
66169c7e7c | ||
|
|
4f5fafc8f3 | ||
|
|
0b8e6c6ed3 | ||
|
|
4a62d38af2 | ||
|
|
dc4f249f09 | ||
|
|
c915841a45 | ||
|
|
217a337d8c | ||
|
|
74d57008b6 | ||
|
|
f8767bc678 | ||
|
|
81c83bba68 | ||
|
|
cd8ec58703 | ||
|
|
931b17c3fd | ||
|
|
807d328cf0 | ||
|
|
8a6bbd29fe | ||
|
|
8bedaaee71 | ||
|
|
09cd105671 | ||
|
|
a0b60b7e64 | ||
|
|
90e15d096d | ||
|
|
a161ca294f | ||
|
|
3a5c41e3da | ||
|
|
968d3c9534 | ||
|
|
fc6666f6a7 | ||
|
|
ff3a854326 | ||
|
|
04543ed16b | ||
|
|
ae3f0d4d1a | ||
|
|
5bccdc4a5d | ||
|
|
0b7cd60668 | ||
|
|
aa983b49af | ||
|
|
2aff617767 | ||
|
|
2c5d00ffb5 | ||
|
|
e2a3527414 | ||
|
|
e4cb3d3aed | ||
|
|
079b7b9d57 | ||
|
|
8a25d7d3cf | ||
|
|
a5ced84098 | ||
|
|
59760a2fe3 | ||
|
|
b9d21f293e | ||
|
|
dec1cad318 | ||
|
|
165b94c3fa | ||
|
|
69e4c06ae7 | ||
|
|
1406a984a7 | ||
|
|
93d6b9782c | ||
|
|
68e4ff1f7d | ||
|
|
33467ea6dd | ||
|
|
3bf9280b3c | ||
|
|
5c93986e6d | ||
|
|
779e0eb8bb | ||
|
|
5c4163c177 | ||
|
|
c5d1f70dd3 | ||
|
|
a8ec78fc87 | ||
|
|
1ecbb0b9d6 | ||
|
|
a40647e651 | ||
|
|
b25b8c00ee | ||
|
|
b20a99e1c9 | ||
|
|
9ec0e3cd51 | ||
|
|
09837bbdb4 | ||
|
|
198e457a12 | ||
|
|
c727c61101 | ||
|
|
366857559b | ||
|
|
ccd15e8a25 | ||
|
|
67f89fa4b2 | ||
|
|
a87510069d | ||
|
|
b3fe168548 | ||
|
|
8d7583b6fb | ||
|
|
32466fe223 | ||
|
|
f2061991c5 | ||
|
|
a549b4d66d | ||
|
|
cdcea2bd33 | ||
|
|
3898cc5c3d | ||
|
|
c558c1d10f | ||
|
|
5f7ecc6187 | ||
|
|
15b6e7f6fc | ||
|
|
503b9b87a6 | ||
|
|
600eab20a5 | ||
|
|
a7eef9c6dc | ||
|
|
6aebf8c064 | ||
|
|
655a463c18 | ||
|
|
a8b9c9a9dc | ||
|
|
7679625fd3 | ||
|
|
7ac0d542b6 | ||
|
|
e4b2b1edf3 | ||
|
|
95ed377135 | ||
|
|
db01c10a1d | ||
|
|
b9d7744389 | ||
|
|
e72e85632b | ||
|
|
8033b77ad3 | ||
|
|
1fe5623f78 | ||
|
|
887421cef2 | ||
|
|
352430cd84 |
5
.changelog/bold-frogs-run.md
Normal file
5
.changelog/bold-frogs-run.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-transaction-pool: patch
|
||||
---
|
||||
|
||||
Renamed and documented validation methods for clarity. `validate_one_no_state` and `validate_one_against_state` are now public methods `validate_stateless` and `validate_stateful` with improved documentation explaining their respective validation phases.
|
||||
5
.changelog/dry-ducks-write.md
Normal file
5
.changelog/dry-ducks-write.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-network: minor
|
||||
---
|
||||
|
||||
Added reason label to backed_off_peers metric. The metric now tracks backed off peers by reason (too_many_peers, graceful_close, connection_error) to improve observability.
|
||||
5
.changelog/eager-mules-fold.md
Normal file
5
.changelog/eager-mules-fold.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-trie-sparse-parallel: patch
|
||||
---
|
||||
|
||||
Fixed parallel sparse trie to skip revealing disconnected leaves by checking parent branch reachability before inserting leaf nodes.
|
||||
5
.changelog/easy-clouds-meow.md
Normal file
5
.changelog/easy-clouds-meow.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
ef-tests: patch
|
||||
---
|
||||
|
||||
Removed reth-stateless crate and stateless validation from ef-tests.
|
||||
6
.changelog/evil-pigs-cry.md
Normal file
6
.changelog/evil-pigs-cry.md
Normal file
@@ -0,0 +1,6 @@
|
||||
---
|
||||
reth-engine-tree: patch
|
||||
reth-trie-sparse-parallel: patch
|
||||
---
|
||||
|
||||
Added tracing spans and debug logs to sparse trie operations for better observability during parallel state root computation.
|
||||
6
.changelog/fair-winds-growl.md
Normal file
6
.changelog/fair-winds-growl.md
Normal file
@@ -0,0 +1,6 @@
|
||||
---
|
||||
reth-exex: patch
|
||||
reth-exex-types: patch
|
||||
---
|
||||
|
||||
Added configurable backfill thresholds to ExEx notifications stream and added regression tests for state provider parity between pipeline and backfill execution paths.
|
||||
4
.changelog/fast-fish-cry.md
Normal file
4
.changelog/fast-fish-cry.md
Normal file
@@ -0,0 +1,4 @@
|
||||
---
|
||||
---
|
||||
|
||||
Added WebSocket subscription integration tests for eth_subscribe.
|
||||
4
.changelog/fast-waves-smile.md
Normal file
4
.changelog/fast-waves-smile.md
Normal file
@@ -0,0 +1,4 @@
|
||||
---
|
||||
---
|
||||
|
||||
Improved nightly Docker build failure Slack notification with more detailed formatting and context.
|
||||
7
.changelog/icy-lions-slide.md
Normal file
7
.changelog/icy-lions-slide.md
Normal file
@@ -0,0 +1,7 @@
|
||||
---
|
||||
reth: patch
|
||||
reth-cli-commands: patch
|
||||
reth-node-core: patch
|
||||
---
|
||||
|
||||
Removed experimental ress protocol support for stateless Ethereum nodes.
|
||||
5
.changelog/lazy-lakes-shout.md
Normal file
5
.changelog/lazy-lakes-shout.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-node-builder: patch
|
||||
---
|
||||
|
||||
Removed biased select in engine service loop to allow fair scheduling of shutdown requests alongside event processing.
|
||||
5
.changelog/lively-clams-drink.md
Normal file
5
.changelog/lively-clams-drink.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-transaction-pool: patch
|
||||
---
|
||||
|
||||
Fixed swapped arguments in `blob_tx_priority` function calls, correcting the parameter order to match the function signature.
|
||||
4
.changelog/lively-clouds-bake.md
Normal file
4
.changelog/lively-clouds-bake.md
Normal file
@@ -0,0 +1,4 @@
|
||||
---
|
||||
---
|
||||
|
||||
Improved documentation overview page with better structure and clarity.
|
||||
5
.changelog/lively-foxes-play.md
Normal file
5
.changelog/lively-foxes-play.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-node-events: patch
|
||||
---
|
||||
|
||||
Updated consensus engine log message to be more accurate about received updates.
|
||||
6
.changelog/mild-foxes-bark.md
Normal file
6
.changelog/mild-foxes-bark.md
Normal file
@@ -0,0 +1,6 @@
|
||||
---
|
||||
reth-chainspec: minor
|
||||
reth-network-peers: minor
|
||||
---
|
||||
|
||||
Removed OP stack bootnodes from default chain configurations and network peers module.
|
||||
9
.changelog/nice-trees-drink.md
Normal file
9
.changelog/nice-trees-drink.md
Normal file
@@ -0,0 +1,9 @@
|
||||
---
|
||||
reth-network-api: minor
|
||||
reth-network-types: minor
|
||||
reth-network: minor
|
||||
reth-node-core: minor
|
||||
reth: minor
|
||||
---
|
||||
|
||||
Added optional ENR fork ID enforcement to filter out peers from incompatible networks during peer discovery, controlled by the `--enforce-enr-fork-id` CLI flag.
|
||||
5
.changelog/nice-waves-bow.md
Normal file
5
.changelog/nice-waves-bow.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-primitives: patch
|
||||
---
|
||||
|
||||
Moved feature-referenced dependencies from dev-dependencies to optional dependencies to ensure they are available when their corresponding features are enabled.
|
||||
5
.changelog/old-dogs-shake.md
Normal file
5
.changelog/old-dogs-shake.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-transaction-pool: minor
|
||||
---
|
||||
|
||||
Added `IntoIter: Send` bounds to `validate_transactions` and `validate_transactions_with_origin` in the `TransactionValidator` trait, avoiding unnecessary `Vec` collects. Simplified default `validate_transactions_with_origin` to delegate to `validate_transactions`.
|
||||
5
.changelog/quiet-frogs-whisper.md
Normal file
5
.changelog/quiet-frogs-whisper.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-provider: patch
|
||||
---
|
||||
|
||||
Removed unused staging types from ProviderFactoryBuilder.
|
||||
5
.changelog/swift-owls-fly.md
Normal file
5
.changelog/swift-owls-fly.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-trie-sparse: minor
|
||||
---
|
||||
|
||||
Removed `SerialSparseTrie` from the workspace, consolidating on `ParallelSparseTrie` as the single sparse trie implementation in `reth-trie-sparse`.
|
||||
5
.changelog/tidy-stars-cry.md
Normal file
5
.changelog/tidy-stars-cry.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-trie-sparse: patch
|
||||
---
|
||||
|
||||
Fixed a bug where trie nodes could appear in both `updated_nodes` and `removed_nodes` simultaneously by removing entries from `removed_nodes` when a node is inserted as updated.
|
||||
4
.changelog/vast-waves-fold.md
Normal file
4
.changelog/vast-waves-fold.md
Normal file
@@ -0,0 +1,4 @@
|
||||
---
|
||||
---
|
||||
|
||||
Expanded CLI integration tests with subcommand help coverage, config TOML validation, genesis JSON validation, and send transaction round-trip test for dev mode.
|
||||
5
.changelog/warm-foxes-glow.md
Normal file
5
.changelog/warm-foxes-glow.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-network: minor
|
||||
---
|
||||
|
||||
Added direction labels to `closed_sessions` and `pending_session_failures` metrics. Operators can now distinguish session closures and failures by direction (`active`, `incoming_pending`, `outgoing_pending` for closed sessions; `inbound`, `outbound` for pending session failures).
|
||||
4
.changelog/warm-foxes-jump.md
Normal file
4
.changelog/warm-foxes-jump.md
Normal file
@@ -0,0 +1,4 @@
|
||||
---
|
||||
---
|
||||
|
||||
Moved Kurtosis CI failure notifications to the hive Slack channel.
|
||||
7
.changelog/warm-geese-build.md
Normal file
7
.changelog/warm-geese-build.md
Normal file
@@ -0,0 +1,7 @@
|
||||
---
|
||||
reth-rpc-api: minor
|
||||
reth-rpc-builder: patch
|
||||
reth-rpc: minor
|
||||
---
|
||||
|
||||
Added `subscribeFinalizedChainNotifications` RPC endpoint that buffers committed chain notifications and emits them once a new finalized block is received.
|
||||
5
.changelog/zesty-clouds-wave.md
Normal file
5
.changelog/zesty-clouds-wave.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
reth-trie: patch
|
||||
---
|
||||
|
||||
Fixed a potential panic in `ProofCalculator` by clearing internal computation state (`branch_stack`, `child_stack`, `branch_path`, etc.) after errors, preventing stale state from causing `usize` underflow panics when the calculator is reused. Added a test verifying correct behavior after simulated mid-computation errors.
|
||||
2
.github/CODEOWNERS
vendored
2
.github/CODEOWNERS
vendored
@@ -38,7 +38,7 @@ crates/storage/libmdbx-rs/ @shekhirin
|
||||
crates/storage/nippy-jar/ @joshieDo @shekhirin
|
||||
crates/storage/provider/ @joshieDo @shekhirin @yongkangc
|
||||
crates/storage/storage-api/ @joshieDo
|
||||
crates/tasks/ @mattsse
|
||||
crates/tasks/ @mattsse @DaniPopes
|
||||
crates/tokio-util/ @mattsse
|
||||
crates/tracing/ @mattsse @shekhirin
|
||||
crates/tracing-otlp/ @mattsse @Rjected
|
||||
|
||||
51
.github/scripts/check_rv32imac.sh
vendored
51
.github/scripts/check_rv32imac.sh
vendored
@@ -1,7 +1,6 @@
|
||||
#!/usr/bin/env bash
|
||||
set +e # Disable immediate exit on error
|
||||
set -uo pipefail
|
||||
|
||||
# Array of crates to check
|
||||
crates_to_check=(
|
||||
reth-codecs-derive
|
||||
reth-primitives
|
||||
@@ -28,54 +27,22 @@ crates_to_check=(
|
||||
reth-ethereum-forks
|
||||
reth-ethereum-primitives
|
||||
reth-ethereum-consensus
|
||||
reth-stateless
|
||||
)
|
||||
|
||||
# Array to hold the results
|
||||
results=()
|
||||
# Flag to track if any command fails
|
||||
any_failed=0
|
||||
tmpdir=$(mktemp -d 2>/dev/null || mktemp -d -t reth-check)
|
||||
trap 'rm -rf -- "$tmpdir"' EXIT INT TERM
|
||||
|
||||
for crate in "${crates_to_check[@]}"; do
|
||||
cmd="cargo +stable build -p $crate --target riscv32imac-unknown-none-elf --no-default-features"
|
||||
|
||||
if [ -n "$CI" ]; then
|
||||
echo "::group::$cmd"
|
||||
outfile="$tmpdir/$crate.log"
|
||||
if cargo +stable build -p "$crate" --target riscv32imac-unknown-none-elf --no-default-features --color never >"$outfile" 2>&1; then
|
||||
echo "✅ $crate"
|
||||
else
|
||||
printf "\n%s:\n %s\n" "$crate" "$cmd"
|
||||
fi
|
||||
|
||||
set +e # Disable immediate exit on error
|
||||
# Run the command and capture the return code
|
||||
$cmd
|
||||
ret_code=$?
|
||||
set -e # Re-enable immediate exit on error
|
||||
|
||||
# Store the result in the dictionary
|
||||
if [ $ret_code -eq 0 ]; then
|
||||
results+=("1:✅:$crate")
|
||||
else
|
||||
results+=("2:❌:$crate")
|
||||
echo "❌ $crate"
|
||||
sed 's/^/ /' "$outfile"
|
||||
echo ""
|
||||
any_failed=1
|
||||
fi
|
||||
|
||||
if [ -n "$CI" ]; then
|
||||
echo "::endgroup::"
|
||||
fi
|
||||
done
|
||||
|
||||
# Sort the results by status and then by crate name
|
||||
IFS=$'\n' sorted_results=($(sort <<<"${results[*]}"))
|
||||
unset IFS
|
||||
|
||||
# Print summary
|
||||
echo -e "\nSummary of build results:"
|
||||
for result in "${sorted_results[@]}"; do
|
||||
status="${result#*:}"
|
||||
status="${status%%:*}"
|
||||
crate="${result##*:}"
|
||||
echo "$status $crate"
|
||||
done
|
||||
|
||||
# Exit with a non-zero status if any command fails
|
||||
exit $any_failed
|
||||
|
||||
71
.github/scripts/check_wasm.sh
vendored
71
.github/scripts/check_wasm.sh
vendored
@@ -1,11 +1,10 @@
|
||||
#!/usr/bin/env bash
|
||||
set +e # Disable immediate exit on error
|
||||
set -uo pipefail
|
||||
|
||||
# Array of crates to compile
|
||||
crates=($(cargo metadata --format-version=1 --no-deps | jq -r '.packages[].name' | grep '^reth' | sort))
|
||||
readarray -t crates < <(
|
||||
cargo metadata --format-version=1 --no-deps | jq -r '.packages[].name' | grep '^reth' | sort
|
||||
)
|
||||
|
||||
# Array of crates to exclude
|
||||
# Used with the `contains` function.
|
||||
# shellcheck disable=SC2034
|
||||
exclude_crates=(
|
||||
# The following require investigation if they can be fixed
|
||||
@@ -64,6 +63,7 @@ exclude_crates=(
|
||||
reth-provider # tokio
|
||||
reth-prune # tokio
|
||||
reth-prune-static-files # reth-provider
|
||||
reth-tasks # tokio rt-multi-thread
|
||||
reth-stages-api # reth-provider, reth-prune
|
||||
reth-static-file # tokio
|
||||
reth-transaction-pool # c-kzg
|
||||
@@ -77,70 +77,35 @@ exclude_crates=(
|
||||
reth-node-ethstats
|
||||
)
|
||||
|
||||
# Array to hold the results
|
||||
results=()
|
||||
# Flag to track if any command fails
|
||||
any_failed=0
|
||||
tmpdir=$(mktemp -d 2>/dev/null || mktemp -d -t reth-check)
|
||||
trap 'rm -rf -- "$tmpdir"' EXIT INT TERM
|
||||
|
||||
# Function to check if a value exists in an array
|
||||
contains() {
|
||||
local array="$1[@]"
|
||||
local seeking=$2
|
||||
local in=1
|
||||
local seeking="$2"
|
||||
local element
|
||||
for element in "${!array}"; do
|
||||
if [[ "$element" == "$seeking" ]]; then
|
||||
in=0
|
||||
break
|
||||
fi
|
||||
[[ "$element" == "$seeking" ]] && return 0
|
||||
done
|
||||
return $in
|
||||
return 1
|
||||
}
|
||||
|
||||
for crate in "${crates[@]}"; do
|
||||
if contains exclude_crates "$crate"; then
|
||||
results+=("3:⏭️:$crate")
|
||||
echo "⏭️ $crate"
|
||||
continue
|
||||
fi
|
||||
|
||||
cmd="cargo +stable build -p $crate --target wasm32-wasip1 --no-default-features"
|
||||
|
||||
if [ -n "$CI" ]; then
|
||||
echo "::group::$cmd"
|
||||
outfile="$tmpdir/$crate.log"
|
||||
if cargo +stable build -p "$crate" --target wasm32-wasip1 --no-default-features --color never >"$outfile" 2>&1; then
|
||||
echo "✅ $crate"
|
||||
else
|
||||
printf "\n%s:\n %s\n" "$crate" "$cmd"
|
||||
fi
|
||||
|
||||
set +e # Disable immediate exit on error
|
||||
# Run the command and capture the return code
|
||||
$cmd
|
||||
ret_code=$?
|
||||
set -e # Re-enable immediate exit on error
|
||||
|
||||
# Store the result in the dictionary
|
||||
if [ $ret_code -eq 0 ]; then
|
||||
results+=("1:✅:$crate")
|
||||
else
|
||||
results+=("2:❌:$crate")
|
||||
echo "❌ $crate"
|
||||
sed 's/^/ /' "$outfile"
|
||||
echo ""
|
||||
any_failed=1
|
||||
fi
|
||||
|
||||
if [ -n "$CI" ]; then
|
||||
echo "::endgroup::"
|
||||
fi
|
||||
done
|
||||
|
||||
# Sort the results by status and then by crate name
|
||||
IFS=$'\n' sorted_results=($(sort <<<"${results[*]}"))
|
||||
unset IFS
|
||||
|
||||
# Print summary
|
||||
echo -e "\nSummary of build results:"
|
||||
for result in "${sorted_results[@]}"; do
|
||||
status="${result#*:}"
|
||||
status="${status%%:*}"
|
||||
crate="${result##*:}"
|
||||
echo "$status $crate"
|
||||
done
|
||||
|
||||
# Exit with a non-zero status if any command fails
|
||||
exit $any_failed
|
||||
|
||||
1
.github/workflows/dependencies.yml
vendored
1
.github/workflows/dependencies.yml
vendored
@@ -15,6 +15,7 @@ permissions:
|
||||
|
||||
jobs:
|
||||
update:
|
||||
if: github.repository == 'paradigmxyz/reth'
|
||||
uses: tempoxyz/ci/.github/workflows/cargo-update-pr.yml@main
|
||||
secrets:
|
||||
token: ${{ secrets.GITHUB_TOKEN }}
|
||||
53
.github/workflows/docker-test.yml
vendored
Normal file
53
.github/workflows/docker-test.yml
vendored
Normal file
@@ -0,0 +1,53 @@
|
||||
name: Build test Docker image
|
||||
|
||||
on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
hive_target:
|
||||
required: true
|
||||
type: string
|
||||
description: "Docker bake target to build (e.g. hive-stable, hive-edge)"
|
||||
artifact_name:
|
||||
required: false
|
||||
type: string
|
||||
default: "artifacts"
|
||||
description: "Name for the uploaded artifact"
|
||||
|
||||
jobs:
|
||||
build:
|
||||
if: github.repository == 'paradigmxyz/reth'
|
||||
timeout-minutes: 45
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
id-token: write
|
||||
contents: read
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
- run: mkdir -p artifacts
|
||||
|
||||
- name: Get git info
|
||||
id: git
|
||||
run: |
|
||||
echo "sha=${{ github.sha }}" >> "$GITHUB_OUTPUT"
|
||||
echo "describe=$(git describe --always --tags)" >> "$GITHUB_OUTPUT"
|
||||
|
||||
- name: Set up Depot CLI
|
||||
uses: depot/setup-action@v1
|
||||
|
||||
- name: Build reth image
|
||||
uses: depot/bake-action@v1
|
||||
env:
|
||||
DEPOT_TOKEN: ${{ secrets.DEPOT_TOKEN }}
|
||||
VERGEN_GIT_SHA: ${{ steps.git.outputs.sha }}
|
||||
VERGEN_GIT_DESCRIBE: ${{ steps.git.outputs.describe }}
|
||||
with:
|
||||
project: ${{ vars.DEPOT_PROJECT_ID }}
|
||||
files: docker-bake.hcl
|
||||
targets: ${{ inputs.hive_target }}
|
||||
push: false
|
||||
|
||||
- name: Upload reth image
|
||||
uses: actions/upload-artifact@v6
|
||||
with:
|
||||
name: ${{ inputs.artifact_name }}
|
||||
path: ./artifacts
|
||||
28
.github/workflows/docker.yml
vendored
28
.github/workflows/docker.yml
vendored
@@ -31,6 +31,7 @@ on:
|
||||
|
||||
jobs:
|
||||
build:
|
||||
if: github.repository == 'paradigmxyz/reth'
|
||||
name: Build Docker images
|
||||
runs-on: ubuntu-24.04
|
||||
permissions:
|
||||
@@ -69,18 +70,27 @@ jobs:
|
||||
# Add 'latest' tag for non-RC releases
|
||||
if [[ ! "$VERSION" =~ -rc ]]; then
|
||||
echo "ethereum_tags=${REGISTRY}/reth:${VERSION},${REGISTRY}/reth:latest" >> "$GITHUB_OUTPUT"
|
||||
{
|
||||
echo "ethereum_set<<EOF"
|
||||
echo "ethereum.tags=${REGISTRY}/reth:${VERSION}"
|
||||
echo "ethereum.tags=${REGISTRY}/reth:latest"
|
||||
echo "EOF"
|
||||
} >> "$GITHUB_OUTPUT"
|
||||
else
|
||||
echo "ethereum_tags=${REGISTRY}/reth:${VERSION}" >> "$GITHUB_OUTPUT"
|
||||
echo "ethereum_set=ethereum.tags=${REGISTRY}/reth:${VERSION}" >> "$GITHUB_OUTPUT"
|
||||
fi
|
||||
|
||||
elif [[ "${{ github.event_name }}" == "schedule" ]] || [[ "${{ inputs.build_type }}" == "nightly" ]]; then
|
||||
echo "targets=nightly" >> "$GITHUB_OUTPUT"
|
||||
echo "ethereum_tags=${REGISTRY}/reth:nightly" >> "$GITHUB_OUTPUT"
|
||||
echo "ethereum_set=ethereum.tags=${REGISTRY}/reth:nightly" >> "$GITHUB_OUTPUT"
|
||||
|
||||
else
|
||||
# git-sha build
|
||||
echo "targets=ethereum" >> "$GITHUB_OUTPUT"
|
||||
echo "ethereum_tags=${REGISTRY}/reth:${{ github.sha }}" >> "$GITHUB_OUTPUT"
|
||||
echo "ethereum_set=ethereum.tags=${REGISTRY}/reth:${{ github.sha }}" >> "$GITHUB_OUTPUT"
|
||||
fi
|
||||
|
||||
- name: Build and push images
|
||||
@@ -96,7 +106,7 @@ jobs:
|
||||
targets: ${{ steps.params.outputs.targets }}
|
||||
push: ${{ !(github.event_name == 'workflow_dispatch' && inputs.dry_run) }}
|
||||
set: |
|
||||
ethereum.tags=${{ steps.params.outputs.ethereum_tags }}
|
||||
${{ steps.params.outputs.ethereum_set }}
|
||||
|
||||
- name: Verify image architectures
|
||||
env:
|
||||
@@ -116,6 +126,18 @@ jobs:
|
||||
- name: Slack Webhook Action
|
||||
uses: rtCamp/action-slack-notify@v2
|
||||
env:
|
||||
SLACK_COLOR: ${{ job.status }}
|
||||
SLACK_MESSAGE: "Failed run: https://github.com/paradigmxyz/reth/actions/runs/${{ github.run_id }}"
|
||||
SLACK_COLOR: danger
|
||||
SLACK_ICON_EMOJI: ":rotating_light:"
|
||||
SLACK_USERNAME: "GitHub Actions"
|
||||
SLACK_TITLE: ":rotating_light: Nightly Docker Build Failed"
|
||||
SLACK_MESSAGE: |
|
||||
The scheduled nightly Docker build failed.
|
||||
|
||||
*Commit:* `${{ github.sha }}`
|
||||
*Branch:* `${{ github.ref_name }}`
|
||||
*Run:* <https://github.com/paradigmxyz/reth/actions/runs/${{ github.run_id }}|View logs>
|
||||
|
||||
*Action required:* Re-run the workflow or investigate the build failure.
|
||||
SLACK_FOOTER: "paradigmxyz/reth · docker.yml"
|
||||
MSG_MINIMAL: true
|
||||
SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
|
||||
2
.github/workflows/e2e.yml
vendored
2
.github/workflows/e2e.yml
vendored
@@ -35,6 +35,7 @@ jobs:
|
||||
- name: Run e2e tests
|
||||
run: |
|
||||
cargo nextest run \
|
||||
--no-fail-fast \
|
||||
--locked --features "asm-keccak" \
|
||||
--workspace \
|
||||
--exclude 'example-*' \
|
||||
@@ -61,6 +62,7 @@ jobs:
|
||||
- name: Run RocksDB e2e tests
|
||||
run: |
|
||||
cargo nextest run \
|
||||
--no-fail-fast \
|
||||
--locked --features "edge" \
|
||||
-p reth-e2e-test-utils \
|
||||
-E 'binary(rocksdb)'
|
||||
|
||||
30
.github/workflows/hive.yml
vendored
30
.github/workflows/hive.yml
vendored
@@ -5,7 +5,7 @@ name: hive
|
||||
on:
|
||||
workflow_dispatch:
|
||||
schedule:
|
||||
- cron: "0 */6 * * *"
|
||||
- cron: "0 0 * * *"
|
||||
|
||||
env:
|
||||
CARGO_TERM_COLOR: always
|
||||
@@ -15,27 +15,24 @@ concurrency:
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
prepare-reth-stable:
|
||||
uses: ./.github/workflows/prepare-reth.yml
|
||||
build-reth-stable:
|
||||
uses: ./.github/workflows/docker-test.yml
|
||||
with:
|
||||
image_tag: ghcr.io/paradigmxyz/reth:latest
|
||||
binary_name: reth
|
||||
cargo_features: "asm-keccak"
|
||||
hive_target: hive-stable
|
||||
artifact_name: "reth-stable"
|
||||
secrets: inherit
|
||||
|
||||
prepare-reth-edge:
|
||||
uses: ./.github/workflows/prepare-reth.yml
|
||||
build-reth-edge:
|
||||
uses: ./.github/workflows/docker-test.yml
|
||||
with:
|
||||
image_tag: ghcr.io/paradigmxyz/reth:latest
|
||||
binary_name: reth
|
||||
cargo_features: "asm-keccak edge"
|
||||
hive_target: hive-edge
|
||||
artifact_name: "reth-edge"
|
||||
secrets: inherit
|
||||
|
||||
prepare-hive:
|
||||
if: github.repository == 'paradigmxyz/reth'
|
||||
timeout-minutes: 45
|
||||
runs-on:
|
||||
group: Reth
|
||||
runs-on: depot-ubuntu-latest-4
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
- name: Checkout hive tests
|
||||
@@ -187,12 +184,11 @@ jobs:
|
||||
- sim: ethereum/eels/consume-rlp
|
||||
limit: .*tests/paris.*
|
||||
needs:
|
||||
- prepare-reth-stable
|
||||
- prepare-reth-edge
|
||||
- build-reth-stable
|
||||
- build-reth-edge
|
||||
- prepare-hive
|
||||
name: ${{ matrix.storage }} / ${{ matrix.scenario.sim }}${{ matrix.scenario.limit && format(' - {0}', matrix.scenario.limit) }}
|
||||
runs-on:
|
||||
group: Reth
|
||||
runs-on: depot-ubuntu-latest-4
|
||||
permissions:
|
||||
issues: write
|
||||
steps:
|
||||
|
||||
5
.github/workflows/integration.yml
vendored
5
.github/workflows/integration.yml
vendored
@@ -46,6 +46,7 @@ jobs:
|
||||
- name: Run tests
|
||||
run: |
|
||||
cargo nextest run \
|
||||
--no-fail-fast \
|
||||
--locked --features "asm-keccak ${{ matrix.network }} ${{ matrix.storage == 'edge' && 'edge' || '' }}" \
|
||||
--workspace --exclude ef-tests \
|
||||
-E "kind(test) and not binary(e2e_testsuite)"
|
||||
@@ -64,7 +65,7 @@ jobs:
|
||||
|
||||
era-files:
|
||||
name: era1 file integration tests once a day
|
||||
if: github.event_name == 'schedule'
|
||||
if: github.event_name == 'schedule' && github.repository == 'paradigmxyz/reth'
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
@@ -76,4 +77,4 @@ jobs:
|
||||
with:
|
||||
cache-on-failure: true
|
||||
- name: run era1 files integration tests
|
||||
run: cargo nextest run --release --package reth-era --test it -- --ignored
|
||||
run: cargo nextest run --no-fail-fast --release --package reth-era --test it -- --ignored
|
||||
|
||||
15
.github/workflows/kurtosis.yml
vendored
15
.github/workflows/kurtosis.yml
vendored
@@ -5,7 +5,7 @@ name: kurtosis
|
||||
on:
|
||||
workflow_dispatch:
|
||||
schedule:
|
||||
- cron: "0 */6 * * *"
|
||||
- cron: "0 0 * * *"
|
||||
|
||||
push:
|
||||
tags:
|
||||
@@ -19,11 +19,12 @@ concurrency:
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
prepare-reth:
|
||||
uses: ./.github/workflows/prepare-reth.yml
|
||||
build-reth:
|
||||
if: github.repository == 'paradigmxyz/reth'
|
||||
uses: ./.github/workflows/docker-test.yml
|
||||
with:
|
||||
image_tag: ghcr.io/paradigmxyz/reth:kurtosis-ci
|
||||
binary_name: reth
|
||||
hive_target: kurtosis
|
||||
secrets: inherit
|
||||
|
||||
test:
|
||||
timeout-minutes: 60
|
||||
@@ -32,7 +33,7 @@ jobs:
|
||||
name: run kurtosis
|
||||
runs-on: depot-ubuntu-latest
|
||||
needs:
|
||||
- prepare-reth
|
||||
- build-reth
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
with:
|
||||
@@ -65,4 +66,4 @@ jobs:
|
||||
env:
|
||||
SLACK_COLOR: ${{ job.status }}
|
||||
SLACK_MESSAGE: "Failed run: https://github.com/paradigmxyz/reth/actions/runs/${{ github.run_id }}"
|
||||
SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
SLACK_WEBHOOK: ${{ secrets.SLACK_HIVE_WEBHOOK_URL }}
|
||||
|
||||
61
.github/workflows/prepare-reth.yml
vendored
61
.github/workflows/prepare-reth.yml
vendored
@@ -1,61 +0,0 @@
|
||||
name: Prepare Reth Image
|
||||
|
||||
on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
image_tag:
|
||||
required: true
|
||||
type: string
|
||||
description: "Docker image tag to use"
|
||||
binary_name:
|
||||
required: false
|
||||
type: string
|
||||
default: "reth"
|
||||
description: "Binary name to build"
|
||||
cargo_features:
|
||||
required: false
|
||||
type: string
|
||||
default: "asm-keccak"
|
||||
description: "Cargo features to enable"
|
||||
cargo_package:
|
||||
required: false
|
||||
type: string
|
||||
description: "Optional cargo package path"
|
||||
artifact_name:
|
||||
required: false
|
||||
type: string
|
||||
default: "artifacts"
|
||||
description: "Name for the uploaded artifact"
|
||||
|
||||
jobs:
|
||||
prepare-reth:
|
||||
if: github.repository == 'paradigmxyz/reth'
|
||||
timeout-minutes: 45
|
||||
runs-on: depot-ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
- run: mkdir artifacts
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
- name: Build and export reth image
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
context: .
|
||||
file: .github/scripts/hive/Dockerfile
|
||||
tags: ${{ inputs.image_tag }}
|
||||
outputs: type=docker,dest=./artifacts/reth_image.tar
|
||||
build-args: |
|
||||
CARGO_BIN=${{ inputs.binary_name }}
|
||||
MANIFEST_PATH=${{ inputs.cargo_package }}
|
||||
FEATURES=${{ inputs.cargo_features }}
|
||||
cache-from: type=gha
|
||||
cache-to: type=gha,mode=max
|
||||
|
||||
- name: Upload reth image
|
||||
id: upload
|
||||
uses: actions/upload-artifact@v6
|
||||
with:
|
||||
name: ${{ inputs.artifact_name }}
|
||||
path: ./artifacts
|
||||
1
.github/workflows/reproducible-build.yml
vendored
1
.github/workflows/reproducible-build.yml
vendored
@@ -7,6 +7,7 @@ on:
|
||||
|
||||
jobs:
|
||||
build:
|
||||
if: github.repository == 'paradigmxyz/reth'
|
||||
name: build reproducible binaries
|
||||
runs-on: ${{ matrix.runner }}
|
||||
strategy:
|
||||
|
||||
2
.github/workflows/stage.yml
vendored
2
.github/workflows/stage.yml
vendored
@@ -38,7 +38,7 @@ jobs:
|
||||
cache-on-failure: true
|
||||
- name: Build reth
|
||||
run: |
|
||||
cargo install --features asm-keccak,jemalloc --path bin/reth
|
||||
cargo install --path bin/reth
|
||||
- name: Run headers stage
|
||||
run: |
|
||||
reth stage run headers --from ${{ env.FROM_BLOCK }} --to ${{ env.TO_BLOCK }} --commit --checkpoints
|
||||
|
||||
1
.github/workflows/stale.yml
vendored
1
.github/workflows/stale.yml
vendored
@@ -9,6 +9,7 @@ on:
|
||||
|
||||
jobs:
|
||||
close-issues:
|
||||
if: github.repository == 'paradigmxyz/reth'
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
issues: write
|
||||
|
||||
1
.github/workflows/sync-era.yml
vendored
1
.github/workflows/sync-era.yml
vendored
@@ -17,6 +17,7 @@ concurrency:
|
||||
|
||||
jobs:
|
||||
sync:
|
||||
if: github.repository == 'paradigmxyz/reth'
|
||||
name: sync (${{ matrix.chain.bin }})
|
||||
runs-on: depot-ubuntu-latest
|
||||
env:
|
||||
|
||||
1
.github/workflows/sync.yml
vendored
1
.github/workflows/sync.yml
vendored
@@ -17,6 +17,7 @@ concurrency:
|
||||
|
||||
jobs:
|
||||
sync:
|
||||
if: github.repository == 'paradigmxyz/reth'
|
||||
name: sync (${{ matrix.chain.bin }})
|
||||
runs-on: depot-ubuntu-latest
|
||||
env:
|
||||
|
||||
3
.github/workflows/unit.yml
vendored
3
.github/workflows/unit.yml
vendored
@@ -49,6 +49,7 @@ jobs:
|
||||
- name: Run tests
|
||||
run: |
|
||||
cargo nextest run \
|
||||
--no-fail-fast \
|
||||
--features "${{ matrix.features }} $EDGE_FEATURES" --locked \
|
||||
${{ matrix.exclude_args }} --workspace \
|
||||
--exclude ef-tests --no-tests=warn \
|
||||
@@ -87,7 +88,7 @@ jobs:
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
with:
|
||||
cache-on-failure: true
|
||||
- run: cargo nextest run --cargo-profile hivetests -p ef-tests --features "asm-keccak ef-tests"
|
||||
- run: cargo nextest run --no-fail-fast --cargo-profile hivetests -p ef-tests --features "asm-keccak ef-tests"
|
||||
|
||||
doc:
|
||||
name: doc tests
|
||||
|
||||
@@ -381,7 +381,7 @@ cargo nextest run --workspace
|
||||
cargo bench --bench bench_name
|
||||
|
||||
# Build optimized binary
|
||||
cargo build --release --features "jemalloc asm-keccak"
|
||||
cargo build --release
|
||||
|
||||
# Check compilation for all features
|
||||
cargo check --workspace --all-features
|
||||
|
||||
879
Cargo.lock
generated
879
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
116
Cargo.toml
116
Cargo.toml
@@ -1,5 +1,5 @@
|
||||
[workspace.package]
|
||||
version = "1.10.2"
|
||||
version = "1.11.3"
|
||||
edition = "2024"
|
||||
rust-version = "1.88"
|
||||
license = "MIT OR Apache-2.0"
|
||||
@@ -83,8 +83,6 @@ members = [
|
||||
"crates/prune/db",
|
||||
"crates/prune/prune",
|
||||
"crates/prune/types",
|
||||
"crates/ress/protocol",
|
||||
"crates/ress/provider",
|
||||
"crates/revm/",
|
||||
"crates/rpc/ipc/",
|
||||
"crates/rpc/rpc-api/",
|
||||
@@ -101,7 +99,6 @@ members = [
|
||||
"crates/stages/api/",
|
||||
"crates/stages/stages/",
|
||||
"crates/stages/types/",
|
||||
"crates/stateless",
|
||||
"crates/static-file/static-file",
|
||||
"crates/static-file/types/",
|
||||
"crates/storage/codecs/",
|
||||
@@ -125,7 +122,6 @@ members = [
|
||||
"crates/trie/db",
|
||||
"crates/trie/parallel/",
|
||||
"crates/trie/sparse",
|
||||
"crates/trie/sparse-parallel/",
|
||||
"crates/trie/trie",
|
||||
"examples/beacon-api-sidecar-fetcher/",
|
||||
"examples/beacon-api-sse/",
|
||||
@@ -310,6 +306,11 @@ inherits = "release"
|
||||
lto = "fat"
|
||||
codegen-units = 1
|
||||
|
||||
[profile.maxperf-symbols]
|
||||
inherits = "maxperf"
|
||||
debug = "full"
|
||||
strip = "none"
|
||||
|
||||
[profile.reproducible]
|
||||
inherits = "release"
|
||||
panic = "abort"
|
||||
@@ -418,7 +419,6 @@ reth-rpc-convert = { path = "crates/rpc/rpc-convert" }
|
||||
reth-stages = { path = "crates/stages/stages" }
|
||||
reth-stages-api = { path = "crates/stages/api" }
|
||||
reth-stages-types = { path = "crates/stages/types", default-features = false }
|
||||
reth-stateless = { path = "crates/stateless", default-features = false }
|
||||
reth-static-file = { path = "crates/static-file/static-file" }
|
||||
reth-static-file-types = { path = "crates/static-file/types", default-features = false }
|
||||
reth-storage-api = { path = "crates/storage/storage-api", default-features = false }
|
||||
@@ -434,10 +434,7 @@ reth-trie-common = { path = "crates/trie/common", default-features = false }
|
||||
reth-trie-db = { path = "crates/trie/db" }
|
||||
reth-trie-parallel = { path = "crates/trie/parallel" }
|
||||
reth-trie-sparse = { path = "crates/trie/sparse", default-features = false }
|
||||
reth-trie-sparse-parallel = { path = "crates/trie/sparse-parallel" }
|
||||
reth-zstd-compressors = { path = "crates/storage/zstd-compressors", default-features = false }
|
||||
reth-ress-protocol = { path = "crates/ress/protocol" }
|
||||
reth-ress-provider = { path = "crates/ress/provider" }
|
||||
|
||||
# revm
|
||||
revm = { version = "34.0.0", default-features = false }
|
||||
@@ -451,46 +448,57 @@ op-revm = { version = "15.0.0", default-features = false }
|
||||
revm-inspectors = "0.34.2"
|
||||
|
||||
# eth
|
||||
alloy-dyn-abi = "1.5.4"
|
||||
alloy-primitives = { version = "1.5.4", default-features = false, features = ["map-foldhash"] }
|
||||
alloy-sol-types = { version = "1.5.4", default-features = false }
|
||||
alloy-dyn-abi = "1.5.6"
|
||||
alloy-primitives = { version = "1.5.6", default-features = false, features = [
|
||||
"map-foldhash",
|
||||
] }
|
||||
alloy-sol-types = { version = "1.5.6", default-features = false }
|
||||
|
||||
alloy-chains = { version = "0.2.5", default-features = false }
|
||||
alloy-eip2124 = { version = "0.2.0", default-features = false }
|
||||
alloy-eip7928 = { version = "0.3.0", default-features = false }
|
||||
alloy-evm = { version = "0.27.2", default-features = false }
|
||||
alloy-rlp = { version = "0.3.10", default-features = false, features = ["core-net"] }
|
||||
alloy-rlp = { version = "0.3.13", default-features = false, features = [
|
||||
"core-net",
|
||||
] }
|
||||
alloy-trie = { version = "0.9.4", default-features = false }
|
||||
|
||||
alloy-hardforks = "0.4.5"
|
||||
|
||||
alloy-consensus = { version = "1.6.1", default-features = false }
|
||||
alloy-contract = { version = "1.6.1", default-features = false }
|
||||
alloy-eips = { version = "1.6.1", default-features = false }
|
||||
alloy-genesis = { version = "1.6.1", default-features = false }
|
||||
alloy-json-rpc = { version = "1.6.1", default-features = false }
|
||||
alloy-network = { version = "1.6.1", default-features = false }
|
||||
alloy-network-primitives = { version = "1.6.1", default-features = false }
|
||||
alloy-provider = { version = "1.6.1", features = ["reqwest", "debug-api"], default-features = false }
|
||||
alloy-pubsub = { version = "1.6.1", default-features = false }
|
||||
alloy-rpc-client = { version = "1.6.1", default-features = false }
|
||||
alloy-rpc-types = { version = "1.6.1", features = ["eth"], default-features = false }
|
||||
alloy-rpc-types-admin = { version = "1.6.1", default-features = false }
|
||||
alloy-rpc-types-anvil = { version = "1.6.1", default-features = false }
|
||||
alloy-rpc-types-beacon = { version = "1.6.1", default-features = false }
|
||||
alloy-rpc-types-debug = { version = "1.6.1", default-features = false }
|
||||
alloy-rpc-types-engine = { version = "1.6.1", default-features = false }
|
||||
alloy-rpc-types-eth = { version = "1.6.1", default-features = false }
|
||||
alloy-rpc-types-mev = { version = "1.6.1", default-features = false }
|
||||
alloy-rpc-types-trace = { version = "1.6.1", default-features = false }
|
||||
alloy-rpc-types-txpool = { version = "1.6.1", default-features = false }
|
||||
alloy-serde = { version = "1.6.1", default-features = false }
|
||||
alloy-signer = { version = "1.6.1", default-features = false }
|
||||
alloy-signer-local = { version = "1.6.1", default-features = false }
|
||||
alloy-transport = { version = "1.6.1" }
|
||||
alloy-transport-http = { version = "1.6.1", features = ["reqwest-rustls-tls"], default-features = false }
|
||||
alloy-transport-ipc = { version = "1.6.1", default-features = false }
|
||||
alloy-transport-ws = { version = "1.6.1", default-features = false }
|
||||
alloy-consensus = { version = "1.6.3", default-features = false }
|
||||
alloy-contract = { version = "1.6.3", default-features = false }
|
||||
alloy-eips = { version = "1.6.3", default-features = false }
|
||||
alloy-genesis = { version = "1.6.3", default-features = false }
|
||||
alloy-json-rpc = { version = "1.6.3", default-features = false }
|
||||
alloy-network = { version = "1.6.3", default-features = false }
|
||||
alloy-network-primitives = { version = "1.6.3", default-features = false }
|
||||
alloy-provider = { version = "1.6.3", features = [
|
||||
"reqwest",
|
||||
"debug-api",
|
||||
], default-features = false }
|
||||
alloy-pubsub = { version = "1.6.3", default-features = false }
|
||||
alloy-rpc-client = { version = "1.6.3", default-features = false }
|
||||
alloy-rpc-types = { version = "1.6.3", features = [
|
||||
"eth",
|
||||
], default-features = false }
|
||||
alloy-rpc-types-admin = { version = "1.6.3", default-features = false }
|
||||
alloy-rpc-types-anvil = { version = "1.6.3", default-features = false }
|
||||
alloy-rpc-types-beacon = { version = "1.6.3", default-features = false }
|
||||
alloy-rpc-types-debug = { version = "1.6.3", default-features = false }
|
||||
alloy-rpc-types-engine = { version = "1.6.3", default-features = false }
|
||||
alloy-rpc-types-eth = { version = "1.6.3", default-features = false }
|
||||
alloy-rpc-types-mev = { version = "1.6.3", default-features = false }
|
||||
alloy-rpc-types-trace = { version = "1.6.3", default-features = false }
|
||||
alloy-rpc-types-txpool = { version = "1.6.3", default-features = false }
|
||||
alloy-serde = { version = "1.6.3", default-features = false }
|
||||
alloy-signer = { version = "1.6.3", default-features = false }
|
||||
alloy-signer-local = { version = "1.6.3", default-features = false }
|
||||
alloy-transport = { version = "1.6.3" }
|
||||
alloy-transport-http = { version = "1.6.3", features = [
|
||||
"reqwest-rustls-tls",
|
||||
], default-features = false }
|
||||
alloy-transport-ipc = { version = "1.6.3", default-features = false }
|
||||
alloy-transport-ws = { version = "1.6.3", default-features = false }
|
||||
|
||||
# op
|
||||
alloy-op-evm = { version = "0.27.2", default-features = false }
|
||||
@@ -507,7 +515,10 @@ either = { version = "1.15.0", default-features = false }
|
||||
arrayvec = { version = "0.7.6", default-features = false }
|
||||
aquamarine = "0.6"
|
||||
auto_impl = "1"
|
||||
backon = { version = "1.2", default-features = false, features = ["std-blocking-sleep", "tokio-sleep"] }
|
||||
backon = { version = "1.2", default-features = false, features = [
|
||||
"std-blocking-sleep",
|
||||
"tokio-sleep",
|
||||
] }
|
||||
bincode = "1.3"
|
||||
bitflags = "2.4"
|
||||
boyer-moore-magiclen = "0.2.16"
|
||||
@@ -529,9 +540,13 @@ itertools = { version = "0.14", default-features = false }
|
||||
linked_hash_set = "0.1"
|
||||
lz4 = "1.28.1"
|
||||
modular-bitfield = "0.13.1"
|
||||
notify = { version = "8.0.0", default-features = false, features = ["macos_fsevent"] }
|
||||
notify = { version = "8.0.0", default-features = false, features = [
|
||||
"macos_fsevent",
|
||||
] }
|
||||
nybbles = { version = "0.4.8", default-features = false }
|
||||
once_cell = { version = "1.19", default-features = false, features = ["critical-section"] }
|
||||
once_cell = { version = "1.19", default-features = false, features = [
|
||||
"critical-section",
|
||||
] }
|
||||
parking_lot = "0.12"
|
||||
paste = "1.0"
|
||||
rand = "0.9"
|
||||
@@ -550,7 +565,9 @@ strum_macros = "0.27"
|
||||
syn = "2.0"
|
||||
thiserror = { version = "2.0.0", default-features = false }
|
||||
tar = "0.4.44"
|
||||
tracing = { version = "0.1.0", default-features = false }
|
||||
tracing = { version = "0.1.0", default-features = false, features = [
|
||||
"attributes",
|
||||
] }
|
||||
tracing-appender = "0.2"
|
||||
url = { version = "2.3", default-features = false }
|
||||
zstd = "0.13"
|
||||
@@ -588,7 +605,11 @@ futures-util = { version = "0.3", default-features = false }
|
||||
hyper = "1.3"
|
||||
hyper-util = "0.1.5"
|
||||
pin-project = "1.0.12"
|
||||
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "rustls-tls-native-roots", "stream"] }
|
||||
reqwest = { version = "0.12", default-features = false, features = [
|
||||
"rustls-tls",
|
||||
"rustls-tls-native-roots",
|
||||
"stream",
|
||||
] }
|
||||
tracing-futures = "0.2"
|
||||
tower = "0.5"
|
||||
tower-http = "0.6"
|
||||
@@ -613,7 +634,10 @@ proptest-arbitrary-interop = "0.1.0"
|
||||
# crypto
|
||||
enr = { version = "0.13", default-features = false }
|
||||
k256 = { version = "0.13", default-features = false, features = ["ecdsa"] }
|
||||
secp256k1 = { version = "0.30", default-features = false, features = ["global-context", "recovery"] }
|
||||
secp256k1 = { version = "0.30", default-features = false, features = [
|
||||
"global-context",
|
||||
"recovery",
|
||||
] }
|
||||
# rand 8 for secp256k1
|
||||
rand_08 = { package = "rand", version = "0.8" }
|
||||
|
||||
|
||||
@@ -51,14 +51,15 @@ RUN --mount=type=secret,id=DEPOT_TOKEN,env=SCCACHE_WEBDAV_TOKEN \
|
||||
--mount=type=cache,target=/usr/local/cargo/registry,sharing=shared \
|
||||
--mount=type=cache,target=/usr/local/cargo/git,sharing=shared \
|
||||
--mount=type=cache,target=$SCCACHE_DIR,sharing=shared \
|
||||
export RUSTC_WRAPPER=sccache SCCACHE_WEBDAV_ENDPOINT=https://cache.depot.dev SCCACHE_DIR=/sccache && \
|
||||
sccache --start-server && \
|
||||
if [ -n "$RUSTFLAGS" ]; then \
|
||||
export RUSTFLAGS="$RUSTFLAGS"; \
|
||||
elif [ "$TARGETPLATFORM" = "linux/amd64" ]; then \
|
||||
export RUSTFLAGS="-C target-cpu=x86-64-v3 -C target-feature=+pclmulqdq"; \
|
||||
fi && \
|
||||
cargo build --profile $BUILD_PROFILE --features "$FEATURES" --locked --bin $BINARY --manifest-path $MANIFEST_PATH/Cargo.toml
|
||||
|
||||
RUN sccache --show-stats || true
|
||||
cargo build --profile $BUILD_PROFILE --features "$FEATURES" --locked --bin $BINARY --manifest-path $MANIFEST_PATH/Cargo.toml && \
|
||||
sccache --show-stats
|
||||
|
||||
# Copy binary to a known location (ARG not resolved in COPY)
|
||||
# Note: Custom profiles like maxperf/profiling output to target/<profile>/, not target/release/
|
||||
|
||||
18
Makefile
18
Makefile
@@ -12,12 +12,7 @@ FULL_DB_TOOLS_DIR := $(shell pwd)/$(DB_TOOLS_DIR)/
|
||||
CARGO_TARGET_DIR ?= target
|
||||
|
||||
# List of features to use when building. Can be overridden via the environment.
|
||||
# No jemalloc on Windows
|
||||
ifeq ($(OS),Windows_NT)
|
||||
FEATURES ?= asm-keccak min-debug-logs
|
||||
else
|
||||
FEATURES ?= jemalloc asm-keccak min-debug-logs
|
||||
endif
|
||||
FEATURES ?=
|
||||
|
||||
# Cargo profile for builds. Default is for local builds, CI uses an override.
|
||||
PROFILE ?= release
|
||||
@@ -158,7 +153,7 @@ COV_FILE := lcov.info
|
||||
.PHONY: test-unit
|
||||
test-unit: ## Run unit tests.
|
||||
cargo install cargo-nextest --locked
|
||||
cargo nextest run $(UNIT_TEST_ARGS)
|
||||
cargo nextest run --no-fail-fast $(UNIT_TEST_ARGS)
|
||||
|
||||
|
||||
.PHONY: cov-unit
|
||||
@@ -191,7 +186,7 @@ $(EEST_TESTS_DIR):
|
||||
|
||||
.PHONY: ef-tests
|
||||
ef-tests: $(EF_TESTS_DIR) $(EEST_TESTS_DIR) ## Runs Legacy and EEST tests.
|
||||
cargo nextest run -p ef-tests --release --features ef-tests
|
||||
cargo nextest run --no-fail-fast -p ef-tests --release --features ef-tests
|
||||
|
||||
##@ reth-bench
|
||||
|
||||
@@ -238,16 +233,15 @@ update-book-cli: build-debug ## Update book cli documentation.
|
||||
|
||||
.PHONY: profiling
|
||||
profiling: ## Builds `reth` with optimisations, but also symbols.
|
||||
RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling --features jemalloc,asm-keccak
|
||||
RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling
|
||||
|
||||
.PHONY: maxperf
|
||||
maxperf: ## Builds `reth` with the most aggressive optimisations.
|
||||
RUSTFLAGS="-C target-cpu=native" cargo build --profile maxperf --features jemalloc,asm-keccak
|
||||
RUSTFLAGS="-C target-cpu=native" cargo build --profile maxperf
|
||||
|
||||
.PHONY: maxperf-no-asm
|
||||
maxperf-no-asm: ## Builds `reth` with the most aggressive optimisations, minus the "asm-keccak" feature.
|
||||
RUSTFLAGS="-C target-cpu=native" cargo build --profile maxperf --features jemalloc
|
||||
|
||||
RUSTFLAGS="-C target-cpu=native" cargo build --profile maxperf --no-default-features --features jemalloc,min-debug-logs,otlp,otlp-logs,reth-revm/portable,js-tracer,keccak-cache-global,rocksdb
|
||||
|
||||
fmt:
|
||||
cargo +nightly fmt
|
||||
|
||||
@@ -30,7 +30,7 @@ reth-bench-compare \
|
||||
| `--draw` | Generate charts (needs Python/uv) | `false` | No |
|
||||
| `--profile` | Enable CPU profiling (needs samply) | `false` | No |
|
||||
| `-vvvv` | Debug logging | Info | No |
|
||||
| `--features <FEATURES>` | Rust features for both builds | `jemalloc,asm-keccak` | No |
|
||||
| `--features <FEATURES>` | Extra Rust features for both builds | - | No |
|
||||
| `--rustflags <FLAGS>` | RUSTFLAGS for both builds | `-C target-cpu=native` | No |
|
||||
| `--baseline-features <FEATURES>` | Features for baseline only | Inherits `--features` | No |
|
||||
| `--feature-features <FEATURES>` | Features for feature only | Inherits `--features` | No |
|
||||
|
||||
@@ -191,10 +191,9 @@ pub(crate) struct Args {
|
||||
#[arg(trailing_var_arg = true, allow_hyphen_values = true)]
|
||||
pub reth_args: Vec<String>,
|
||||
|
||||
/// Comma-separated list of features to enable during reth compilation (applied to both builds)
|
||||
///
|
||||
/// Example: `jemalloc,asm-keccak`
|
||||
#[arg(long, value_name = "FEATURES", default_value = "jemalloc,asm-keccak")]
|
||||
/// Comma-separated list of extra features to enable during reth compilation (applied to both
|
||||
/// builds)
|
||||
#[arg(long, value_name = "FEATURES", default_value = "")]
|
||||
pub features: String,
|
||||
|
||||
/// Comma-separated list of features to enable only for baseline build (overrides --features)
|
||||
@@ -205,7 +204,7 @@ pub(crate) struct Args {
|
||||
|
||||
/// Comma-separated list of features to enable only for feature build (overrides --features)
|
||||
///
|
||||
/// Example: `--feature-features jemalloc,asm-keccak`
|
||||
/// Example: `--feature-features jemalloc-prof`
|
||||
#[arg(long, value_name = "FEATURES")]
|
||||
pub feature_features: Option<String>,
|
||||
|
||||
@@ -277,10 +276,8 @@ impl Args {
|
||||
/// Get the default RPC URL for a given chain
|
||||
const fn get_default_rpc_url(chain: &Chain) -> &'static str {
|
||||
match chain.id() {
|
||||
8453 => "https://base.reth.rs/rpc", // base
|
||||
84532 => "https://base-sepolia.rpc.ithaca.xyz", // base-sepolia
|
||||
27082 => "https://rpc.hoodi.ethpandaops.io", // hoodi
|
||||
_ => "https://ethereum.reth.rs/rpc", // mainnet and fallback
|
||||
27082 => "https://rpc.hoodi.ethpandaops.io", // hoodi
|
||||
_ => "https://ethereum.reth.rs/rpc", // mainnet and fallback
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -32,7 +32,7 @@ Otherwise, running `make maxperf` at the root of the repo should be sufficient f
|
||||
The `reth-bench new-payload-fcu` command is the most representative of ethereum mainnet live sync, alternating between sending `engine_newPayload` calls and `engine_forkchoiceUpdated` calls.
|
||||
|
||||
The `new-payload-fcu` command supports two optional waiting modes that can be used together or independently:
|
||||
- `--wait-time <duration>`: Fixed sleep interval between blocks (e.g., `--wait-time 100ms`)
|
||||
- `--wait-time <duration>`: Fixed sleep interval between blocks (e.g., `--wait-time 100ms` or `--wait-time 400` for 400ms)
|
||||
- `--wait-for-persistence`: Waits for blocks to be persisted using the `reth_subscribePersistedBlock` subscription
|
||||
|
||||
When using `--wait-for-persistence`, the benchmark waits after every `(threshold + 1)` blocks, where the threshold defaults to the engine's persistence threshold (2). This can be customized with `--persistence-threshold <N>`.
|
||||
@@ -73,7 +73,7 @@ make profiling
|
||||
|
||||
If the purpose of the benchmark is to obtain `jemalloc` memory profiles that can then be analyzed by `jeprof`, it should be compiled with the `profiling` profile and the `jemalloc-prof` feature:
|
||||
```bash
|
||||
RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling --features "jemalloc-prof,asm-keccak"
|
||||
RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling --features "jemalloc-prof"
|
||||
```
|
||||
|
||||
> [!NOTE]
|
||||
@@ -82,7 +82,7 @@ RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling --features "jem
|
||||
Finally, if the purpose of the benchmark is to profile the node when `snmalloc` is configured as the default allocator, it would be built with the following
|
||||
command:
|
||||
```bash
|
||||
RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling --no-default-features --features "snmalloc-native,asm-keccak"
|
||||
RUSTFLAGS="-C target-cpu=native" cargo build --profile profiling --no-default-features --features "snmalloc-native,asm-keccak,min-debug-logs"
|
||||
```
|
||||
|
||||
### Run the Benchmark:
|
||||
|
||||
@@ -192,6 +192,15 @@ impl Command {
|
||||
parent_header = block.header;
|
||||
parent_hash = block_hash;
|
||||
blocks_processed += 1;
|
||||
|
||||
let progress = match mode {
|
||||
RampMode::Blocks(total) => format!("{blocks_processed}/{total}"),
|
||||
RampMode::TargetGasLimit(target) => {
|
||||
let pct = (parent_header.gas_limit as f64 / target as f64 * 100.0).min(100.0);
|
||||
format!("{pct:.1}%")
|
||||
}
|
||||
};
|
||||
info!(target: "reth-bench", progress, block_number = parent_header.number, gas_limit = parent_header.gas_limit, "Block processed");
|
||||
}
|
||||
|
||||
let final_gas_limit = parent_header.gas_limit;
|
||||
|
||||
@@ -2,7 +2,10 @@
|
||||
|
||||
use crate::valid_payload::call_forkchoice_updated;
|
||||
use eyre::Result;
|
||||
use std::io::{BufReader, Read};
|
||||
use std::{
|
||||
io::{BufReader, Read},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
/// Read input from either a file path or stdin.
|
||||
pub(crate) fn read_input(path: Option<&str>) -> Result<String> {
|
||||
@@ -51,6 +54,22 @@ pub(crate) fn parse_gas_limit(s: &str) -> eyre::Result<u64> {
|
||||
let base: u64 = num_str.trim().parse()?;
|
||||
base.checked_mul(multiplier).ok_or_else(|| eyre::eyre!("value overflow"))
|
||||
}
|
||||
|
||||
/// Parses a duration string, treating bare integers as milliseconds.
|
||||
///
|
||||
/// Accepts either a `humantime` duration string (e.g. `"100ms"`, `"2s"`) or a plain
|
||||
/// integer which is interpreted as milliseconds (e.g. `"400"` → 400ms).
|
||||
pub(crate) fn parse_duration(s: &str) -> eyre::Result<Duration> {
|
||||
match humantime::parse_duration(s) {
|
||||
Ok(d) => Ok(d),
|
||||
Err(_) => {
|
||||
let millis: u64 =
|
||||
s.trim().parse().map_err(|_| eyre::eyre!("invalid duration: {s:?}"))?;
|
||||
Ok(Duration::from_millis(millis))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
use alloy_consensus::Header;
|
||||
use alloy_eips::eip4844::kzg_to_versioned_hash;
|
||||
use alloy_primitives::{Address, B256};
|
||||
@@ -270,4 +289,24 @@ mod tests {
|
||||
assert!(parse_gas_limit("G").is_err());
|
||||
assert!(parse_gas_limit("-1G").is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_duration_with_unit() {
|
||||
assert_eq!(parse_duration("100ms").unwrap(), Duration::from_millis(100));
|
||||
assert_eq!(parse_duration("2s").unwrap(), Duration::from_secs(2));
|
||||
assert_eq!(parse_duration("1m").unwrap(), Duration::from_secs(60));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_duration_bare_millis() {
|
||||
assert_eq!(parse_duration("400").unwrap(), Duration::from_millis(400));
|
||||
assert_eq!(parse_duration("0").unwrap(), Duration::from_millis(0));
|
||||
assert_eq!(parse_duration("1000").unwrap(), Duration::from_millis(1000));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_duration_errors() {
|
||||
assert!(parse_duration("abc").is_err());
|
||||
assert!(parse_duration("").is_err());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,12 +12,12 @@
|
||||
use crate::{
|
||||
bench::{
|
||||
context::BenchContext,
|
||||
helpers::parse_duration,
|
||||
output::{
|
||||
write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
|
||||
},
|
||||
persistence_waiter::{
|
||||
derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter,
|
||||
PERSISTENCE_CHECKPOINT_TIMEOUT,
|
||||
},
|
||||
},
|
||||
valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload},
|
||||
@@ -26,7 +26,6 @@ use alloy_provider::Provider;
|
||||
use alloy_rpc_types_engine::ForkchoiceState;
|
||||
use clap::Parser;
|
||||
use eyre::{Context, OptionExt};
|
||||
use humantime::parse_duration;
|
||||
use reth_cli_runner::CliContext;
|
||||
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
|
||||
use reth_node_core::args::BenchmarkArgs;
|
||||
@@ -41,6 +40,9 @@ pub struct Command {
|
||||
rpc_url: String,
|
||||
|
||||
/// How long to wait after a forkchoice update before sending the next payload.
|
||||
///
|
||||
/// Accepts a duration string (e.g. `100ms`, `2s`) or a bare integer treated as
|
||||
/// milliseconds (e.g. `400`).
|
||||
#[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
|
||||
wait_time: Option<Duration>,
|
||||
|
||||
@@ -67,6 +69,19 @@ pub struct Command {
|
||||
)]
|
||||
persistence_threshold: u64,
|
||||
|
||||
/// Timeout for waiting on persistence at each checkpoint.
|
||||
///
|
||||
/// Must be long enough to account for the persistence thread being blocked
|
||||
/// by pruning after the previous save.
|
||||
#[arg(
|
||||
long = "persistence-timeout",
|
||||
value_name = "PERSISTENCE_TIMEOUT",
|
||||
value_parser = parse_duration,
|
||||
default_value = "120s",
|
||||
verbatim_doc_comment
|
||||
)]
|
||||
persistence_timeout: Duration,
|
||||
|
||||
/// The size of the block buffer (channel capacity) for prefetching blocks from the RPC
|
||||
/// endpoint.
|
||||
#[arg(
|
||||
@@ -105,12 +120,12 @@ impl Command {
|
||||
self.benchmark.ws_rpc_url.as_deref(),
|
||||
&self.benchmark.engine_rpc_url,
|
||||
)?;
|
||||
let sub = setup_persistence_subscription(ws_url).await?;
|
||||
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
|
||||
Some(PersistenceWaiter::with_duration_and_subscription(
|
||||
duration,
|
||||
sub,
|
||||
self.persistence_threshold,
|
||||
PERSISTENCE_CHECKPOINT_TIMEOUT,
|
||||
self.persistence_timeout,
|
||||
))
|
||||
}
|
||||
(Some(duration), false) => Some(PersistenceWaiter::with_duration(duration)),
|
||||
@@ -119,11 +134,11 @@ impl Command {
|
||||
self.benchmark.ws_rpc_url.as_deref(),
|
||||
&self.benchmark.engine_rpc_url,
|
||||
)?;
|
||||
let sub = setup_persistence_subscription(ws_url).await?;
|
||||
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
|
||||
Some(PersistenceWaiter::with_subscription(
|
||||
sub,
|
||||
self.persistence_threshold,
|
||||
PERSISTENCE_CHECKPOINT_TIMEOUT,
|
||||
self.persistence_timeout,
|
||||
))
|
||||
}
|
||||
(None, false) => None,
|
||||
@@ -138,6 +153,7 @@ impl Command {
|
||||
..
|
||||
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
|
||||
|
||||
let total_blocks = benchmark_mode.total_blocks();
|
||||
let buffer_size = self.rpc_block_buffer_size;
|
||||
|
||||
// Use a oneshot channel to propagate errors from the spawned task
|
||||
@@ -191,6 +207,7 @@ impl Command {
|
||||
});
|
||||
|
||||
let mut results = Vec::new();
|
||||
let mut blocks_processed = 0u64;
|
||||
let total_benchmark_duration = Instant::now();
|
||||
let mut total_wait_time = Duration::ZERO;
|
||||
|
||||
@@ -234,8 +251,13 @@ impl Command {
|
||||
|
||||
// Exclude time spent waiting on the block prefetch channel from the benchmark duration.
|
||||
// We want to measure engine throughput, not RPC fetch latency.
|
||||
blocks_processed += 1;
|
||||
let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
|
||||
info!(target: "reth-bench", %combined_result);
|
||||
let progress = match total_blocks {
|
||||
Some(total) => format!("{blocks_processed}/{total}"),
|
||||
None => format!("{blocks_processed}"),
|
||||
};
|
||||
info!(target: "reth-bench", progress, %combined_result);
|
||||
|
||||
if let Some(w) = &mut waiter {
|
||||
w.on_block(block_number).await?;
|
||||
|
||||
@@ -52,6 +52,7 @@ impl Command {
|
||||
..
|
||||
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
|
||||
|
||||
let total_blocks = benchmark_mode.total_blocks();
|
||||
let buffer_size = self.rpc_block_buffer_size;
|
||||
|
||||
// Use a oneshot channel to propagate errors from the spawned task
|
||||
@@ -82,8 +83,8 @@ impl Command {
|
||||
}
|
||||
});
|
||||
|
||||
// put results in a summary vec so they can be printed at the end
|
||||
let mut results = Vec::new();
|
||||
let mut blocks_processed = 0u64;
|
||||
let total_benchmark_duration = Instant::now();
|
||||
let mut total_wait_time = Duration::ZERO;
|
||||
|
||||
@@ -105,7 +106,12 @@ impl Command {
|
||||
call_new_payload(&auth_provider, version, params).await?;
|
||||
|
||||
let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
|
||||
info!(target: "reth-bench", %new_payload_result);
|
||||
blocks_processed += 1;
|
||||
let progress = match total_blocks {
|
||||
Some(total) => format!("{blocks_processed}/{total}"),
|
||||
None => format!("{blocks_processed}"),
|
||||
};
|
||||
info!(target: "reth-bench", progress, %new_payload_result);
|
||||
|
||||
// current duration since the start of the benchmark minus the time
|
||||
// waiting for blocks
|
||||
|
||||
@@ -22,9 +22,6 @@ use tracing::{debug, info};
|
||||
const DEFAULT_WS_RPC_PORT: u16 = 8546;
|
||||
use url::Url;
|
||||
|
||||
/// Default timeout for waiting on persistence.
|
||||
pub(crate) const PERSISTENCE_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
/// Returns the websocket RPC URL used for the persistence subscription.
|
||||
///
|
||||
/// Preference:
|
||||
@@ -157,12 +154,18 @@ impl PersistenceSubscription {
|
||||
}
|
||||
|
||||
/// Establishes a websocket connection and subscribes to `reth_subscribePersistedBlock`.
|
||||
///
|
||||
/// The `keepalive_interval` is set to match `persistence_timeout` so that the `WebSocket`
|
||||
/// connection is not dropped during long MDBX commits that block the server from responding
|
||||
/// to pings.
|
||||
pub(crate) async fn setup_persistence_subscription(
|
||||
ws_url: Url,
|
||||
persistence_timeout: Duration,
|
||||
) -> eyre::Result<PersistenceSubscription> {
|
||||
info!(target: "reth-bench", "Connecting to WebSocket at {} for persistence subscription", ws_url);
|
||||
|
||||
let ws_connect = WsConnect::new(ws_url.to_string());
|
||||
let ws_connect =
|
||||
WsConnect::new(ws_url.to_string()).with_keepalive_interval(persistence_timeout);
|
||||
let client = RpcClient::connect_pubsub(ws_connect)
|
||||
.await
|
||||
.wrap_err("Failed to connect to WebSocket RPC endpoint")?;
|
||||
|
||||
@@ -14,13 +14,13 @@
|
||||
use crate::{
|
||||
authenticated_transport::AuthenticatedTransportConnect,
|
||||
bench::{
|
||||
helpers::parse_duration,
|
||||
output::{
|
||||
write_benchmark_results, CombinedResult, GasRampPayloadFile, NewPayloadResult,
|
||||
TotalGasOutput, TotalGasRow,
|
||||
},
|
||||
persistence_waiter::{
|
||||
derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter,
|
||||
PERSISTENCE_CHECKPOINT_TIMEOUT,
|
||||
},
|
||||
},
|
||||
valid_payload::{call_forkchoice_updated, call_new_payload},
|
||||
@@ -31,7 +31,6 @@ use alloy_rpc_client::ClientBuilder;
|
||||
use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV4, ForkchoiceState, JwtSecret};
|
||||
use clap::Parser;
|
||||
use eyre::Context;
|
||||
use humantime::parse_duration;
|
||||
use reth_cli_runner::CliContext;
|
||||
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
|
||||
use reth_node_api::EngineApiMessageVersion;
|
||||
@@ -79,6 +78,9 @@ pub struct Command {
|
||||
output: Option<PathBuf>,
|
||||
|
||||
/// How long to wait after a forkchoice update before sending the next payload.
|
||||
///
|
||||
/// Accepts a duration string (e.g. `100ms`, `2s`) or a bare integer treated as
|
||||
/// milliseconds (e.g. `400`).
|
||||
#[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
|
||||
wait_time: Option<Duration>,
|
||||
|
||||
@@ -105,6 +107,19 @@ pub struct Command {
|
||||
)]
|
||||
persistence_threshold: u64,
|
||||
|
||||
/// Timeout for waiting on persistence at each checkpoint.
|
||||
///
|
||||
/// Must be long enough to account for the persistence thread being blocked
|
||||
/// by pruning after the previous save.
|
||||
#[arg(
|
||||
long = "persistence-timeout",
|
||||
value_name = "PERSISTENCE_TIMEOUT",
|
||||
value_parser = parse_duration,
|
||||
default_value = "120s",
|
||||
verbatim_doc_comment
|
||||
)]
|
||||
persistence_timeout: Duration,
|
||||
|
||||
/// Optional `WebSocket` RPC URL for persistence subscription.
|
||||
/// If not provided, derives from engine RPC URL by changing scheme to ws and port to 8546.
|
||||
#[arg(long, value_name = "WS_RPC_URL", verbatim_doc_comment)]
|
||||
@@ -154,22 +169,22 @@ impl Command {
|
||||
let mut waiter = match (self.wait_time, self.wait_for_persistence) {
|
||||
(Some(duration), true) => {
|
||||
let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
|
||||
let sub = setup_persistence_subscription(ws_url).await?;
|
||||
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
|
||||
Some(PersistenceWaiter::with_duration_and_subscription(
|
||||
duration,
|
||||
sub,
|
||||
self.persistence_threshold,
|
||||
PERSISTENCE_CHECKPOINT_TIMEOUT,
|
||||
self.persistence_timeout,
|
||||
))
|
||||
}
|
||||
(Some(duration), false) => Some(PersistenceWaiter::with_duration(duration)),
|
||||
(None, true) => {
|
||||
let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
|
||||
let sub = setup_persistence_subscription(ws_url).await?;
|
||||
let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
|
||||
Some(PersistenceWaiter::with_subscription(
|
||||
sub,
|
||||
self.persistence_threshold,
|
||||
PERSISTENCE_CHECKPOINT_TIMEOUT,
|
||||
self.persistence_timeout,
|
||||
))
|
||||
}
|
||||
(None, false) => None,
|
||||
@@ -326,7 +341,8 @@ impl Command {
|
||||
};
|
||||
|
||||
let current_duration = total_benchmark_duration.elapsed();
|
||||
info!(target: "reth-bench", %combined_result);
|
||||
let progress = format!("{}/{}", i + 1, payloads.len());
|
||||
info!(target: "reth-bench", progress, %combined_result);
|
||||
|
||||
if let Some(w) = &mut waiter {
|
||||
w.on_block(block_number).await?;
|
||||
|
||||
@@ -20,6 +20,19 @@ impl BenchMode {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the total number of blocks in the benchmark, if known.
|
||||
///
|
||||
/// For [`BenchMode::Range`] this is the length of the range.
|
||||
/// For [`BenchMode::Continuous`] the total is unbounded, so `None` is returned.
|
||||
pub const fn total_blocks(&self) -> Option<u64> {
|
||||
match self {
|
||||
Self::Continuous(_) => None,
|
||||
Self::Range(range) => {
|
||||
Some(range.end().saturating_sub(*range.start()).saturating_add(1))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a [`BenchMode`] from optional `from` and `to` fields.
|
||||
pub fn new(from: Option<u64>, to: Option<u64>, latest_block: u64) -> Result<Self, eyre::Error> {
|
||||
// If neither `--from` nor `--to` are provided, we will run the benchmark continuously,
|
||||
|
||||
@@ -33,7 +33,6 @@ reth-chainspec.workspace = true
|
||||
reth-primitives.workspace = true
|
||||
reth-db = { workspace = true, features = ["mdbx"] }
|
||||
reth-provider.workspace = true
|
||||
reth-evm.workspace = true
|
||||
reth-revm.workspace = true
|
||||
reth-transaction-pool.workspace = true
|
||||
reth-cli-runner.workspace = true
|
||||
@@ -53,32 +52,31 @@ reth-payload-primitives.workspace = true
|
||||
reth-node-api.workspace = true
|
||||
reth-node-core.workspace = true
|
||||
reth-ethereum-payload-builder.workspace = true
|
||||
reth-ethereum-primitives.workspace = true
|
||||
reth-node-ethereum.workspace = true
|
||||
reth-node-builder.workspace = true
|
||||
reth-node-metrics.workspace = true
|
||||
reth-consensus.workspace = true
|
||||
reth-tokio-util.workspace = true
|
||||
reth-ress-protocol.workspace = true
|
||||
reth-ress-provider.workspace = true
|
||||
|
||||
# alloy
|
||||
alloy-primitives.workspace = true
|
||||
alloy-rpc-types = { workspace = true, features = ["engine"] }
|
||||
|
||||
# tracing
|
||||
tracing.workspace = true
|
||||
|
||||
# async
|
||||
tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread"] }
|
||||
|
||||
# misc
|
||||
aquamarine.workspace = true
|
||||
clap = { workspace = true, features = ["derive", "env"] }
|
||||
eyre.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
alloy-node-bindings = "1.6.3"
|
||||
alloy-provider = { workspace = true, features = ["reqwest"] }
|
||||
alloy-rpc-types-eth.workspace = true
|
||||
backon.workspace = true
|
||||
serde_json.workspace = true
|
||||
tempfile.workspace = true
|
||||
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
|
||||
toml.workspace = true
|
||||
|
||||
[features]
|
||||
default = [
|
||||
@@ -89,6 +87,7 @@ default = [
|
||||
"js-tracer",
|
||||
"keccak-cache-global",
|
||||
"asm-keccak",
|
||||
"min-debug-logs",
|
||||
"rocksdb",
|
||||
]
|
||||
|
||||
@@ -107,7 +106,6 @@ js-tracer = [
|
||||
"reth-rpc-eth-types/js-tracer",
|
||||
]
|
||||
|
||||
debug-jitter = ["reth-node-builder/debug-jitter"]
|
||||
dev = ["reth-ethereum-cli/dev"]
|
||||
|
||||
asm-keccak = [
|
||||
@@ -115,10 +113,12 @@ asm-keccak = [
|
||||
"reth-primitives/asm-keccak",
|
||||
"reth-ethereum-cli/asm-keccak",
|
||||
"reth-node-ethereum/asm-keccak",
|
||||
"alloy-primitives/asm-keccak",
|
||||
]
|
||||
keccak-cache-global = [
|
||||
"reth-node-core/keccak-cache-global",
|
||||
"reth-node-ethereum/keccak-cache-global",
|
||||
"alloy-primitives/keccak-cache-global",
|
||||
]
|
||||
jemalloc = [
|
||||
"reth-cli-util/jemalloc",
|
||||
|
||||
@@ -51,6 +51,9 @@
|
||||
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
|
||||
#![cfg_attr(docsrs, feature(doc_cfg))]
|
||||
|
||||
// Used in feature flags only (`asm-keccak`, `keccak-cache-global`)
|
||||
use alloy_primitives as _;
|
||||
|
||||
pub mod cli;
|
||||
|
||||
/// Re-exported utils.
|
||||
@@ -205,12 +208,9 @@ pub mod rpc {
|
||||
}
|
||||
}
|
||||
|
||||
/// Ress subprotocol installation.
|
||||
pub mod ress;
|
||||
|
||||
// re-export for convenience
|
||||
#[doc(inline)]
|
||||
pub use reth_cli_runner::{tokio_runtime, CliContext, CliRunner};
|
||||
pub use reth_cli_runner::{CliContext, CliRunner};
|
||||
|
||||
// for rendering diagrams
|
||||
use aquamarine as _;
|
||||
@@ -218,3 +218,4 @@ use aquamarine as _;
|
||||
// used in main
|
||||
use clap as _;
|
||||
use reth_cli_util as _;
|
||||
use tracing as _;
|
||||
|
||||
@@ -8,9 +8,8 @@ static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::ne
|
||||
static MALLOC_CONF: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0";
|
||||
|
||||
use clap::Parser;
|
||||
use reth::{args::RessArgs, cli::Cli, ress::install_ress_subprotocol};
|
||||
use reth::cli::Cli;
|
||||
use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
|
||||
use reth_node_builder::NodeHandle;
|
||||
use reth_node_ethereum::EthereumNode;
|
||||
use tracing::info;
|
||||
|
||||
@@ -22,27 +21,12 @@ fn main() {
|
||||
unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
|
||||
}
|
||||
|
||||
if let Err(err) =
|
||||
Cli::<EthereumChainSpecParser, RessArgs>::parse().run(async move |builder, ress_args| {
|
||||
info!(target: "reth::cli", "Launching node");
|
||||
let NodeHandle { node, node_exit_future } =
|
||||
builder.node(EthereumNode::default()).launch_with_debug_capabilities().await?;
|
||||
if let Err(err) = Cli::<EthereumChainSpecParser>::parse().run(async move |builder, _| {
|
||||
info!(target: "reth::cli", "Launching node");
|
||||
let handle = builder.node(EthereumNode::default()).launch_with_debug_capabilities().await?;
|
||||
|
||||
// Install ress subprotocol.
|
||||
if ress_args.enabled {
|
||||
install_ress_subprotocol(
|
||||
ress_args,
|
||||
node.provider,
|
||||
node.evm_config,
|
||||
node.network,
|
||||
node.task_executor,
|
||||
node.add_ons_handle.engine_events.new_listener(),
|
||||
)?;
|
||||
}
|
||||
|
||||
node_exit_future.await
|
||||
})
|
||||
{
|
||||
handle.wait_for_node_exit().await
|
||||
}) {
|
||||
eprintln!("Error: {err:?}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
@@ -1,67 +0,0 @@
|
||||
use reth_ethereum_primitives::EthPrimitives;
|
||||
use reth_evm::ConfigureEvm;
|
||||
use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols};
|
||||
use reth_network_api::FullNetwork;
|
||||
use reth_node_api::ConsensusEngineEvent;
|
||||
use reth_node_core::args::RessArgs;
|
||||
use reth_provider::providers::{BlockchainProvider, ProviderNodeTypes};
|
||||
use reth_ress_protocol::{NodeType, ProtocolState, RessProtocolHandler};
|
||||
use reth_ress_provider::{maintain_pending_state, PendingState, RethRessProtocolProvider};
|
||||
use reth_tasks::TaskExecutor;
|
||||
use reth_tokio_util::EventStream;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::*;
|
||||
|
||||
/// Install `ress` subprotocol if it's enabled.
|
||||
pub fn install_ress_subprotocol<P, E, N>(
|
||||
args: RessArgs,
|
||||
provider: BlockchainProvider<P>,
|
||||
evm_config: E,
|
||||
network: N,
|
||||
task_executor: TaskExecutor,
|
||||
engine_events: EventStream<ConsensusEngineEvent<EthPrimitives>>,
|
||||
) -> eyre::Result<()>
|
||||
where
|
||||
P: ProviderNodeTypes<Primitives = EthPrimitives>,
|
||||
E: ConfigureEvm<Primitives = EthPrimitives> + Clone + 'static,
|
||||
N: FullNetwork + NetworkProtocols,
|
||||
{
|
||||
info!(target: "reth::cli", "Installing ress subprotocol");
|
||||
let pending_state = PendingState::default();
|
||||
|
||||
// Spawn maintenance task for pending state.
|
||||
task_executor.spawn(maintain_pending_state(
|
||||
engine_events,
|
||||
provider.clone(),
|
||||
pending_state.clone(),
|
||||
));
|
||||
|
||||
let (tx, mut rx) = mpsc::unbounded_channel();
|
||||
let provider = RethRessProtocolProvider::new(
|
||||
provider,
|
||||
evm_config,
|
||||
Box::new(task_executor.clone()),
|
||||
args.max_witness_window,
|
||||
args.witness_max_parallel,
|
||||
args.witness_cache_size,
|
||||
pending_state,
|
||||
)?;
|
||||
network.add_rlpx_sub_protocol(
|
||||
RessProtocolHandler {
|
||||
provider,
|
||||
node_type: NodeType::Stateful,
|
||||
peers_handle: network.peers_handle().clone(),
|
||||
max_active_connections: args.max_active_connections,
|
||||
state: ProtocolState::new(tx),
|
||||
}
|
||||
.into_rlpx_sub_protocol(),
|
||||
);
|
||||
info!(target: "reth::cli", "Ress subprotocol support enabled");
|
||||
|
||||
task_executor.spawn(async move {
|
||||
while let Some(event) = rx.recv().await {
|
||||
trace!(target: "reth::ress", ?event, "Received ress event");
|
||||
}
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
255
bin/reth/tests/it/main.rs
Normal file
255
bin/reth/tests/it/main.rs
Normal file
@@ -0,0 +1,255 @@
|
||||
#![allow(missing_docs)]
|
||||
|
||||
use std::process::Command;
|
||||
|
||||
const RETH: &str = env!("CARGO_BIN_EXE_reth");
|
||||
|
||||
// ── Helpers ──────────────────────────────────────────────────────────────────
|
||||
|
||||
/// Runs `reth <args>` and returns stdout, asserting exit code 0.
|
||||
///
|
||||
/// Tracing is suppressed via `RUST_LOG=off` so that log lines emitted during
|
||||
/// binary startup don't pollute stdout-based assertions.
|
||||
#[track_caller]
|
||||
fn reth_ok(args: &[&str]) -> String {
|
||||
let output = Command::new(RETH).env("RUST_LOG", "off").args(args).output().unwrap();
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
assert!(output.status.success(), "args {args:?} failed.\nstdout: {stdout}\nstderr: {stderr}");
|
||||
stdout.into_owned()
|
||||
}
|
||||
|
||||
/// Spawns an isolated dev-mode reth node.
|
||||
///
|
||||
/// Discovery is disabled and peer limits are zeroed so the node is fully
|
||||
/// isolated. Each call gets a unique temporary data directory so that
|
||||
/// concurrent test runs never collide on the default `reth/dev/` path.
|
||||
fn spawn_dev() -> (alloy_node_bindings::RethInstance, tempfile::TempDir) {
|
||||
use alloy_node_bindings::Reth;
|
||||
|
||||
let datadir = tempfile::tempdir().expect("failed to create temp dir");
|
||||
|
||||
let instance = Reth::at(RETH)
|
||||
.dev()
|
||||
.disable_discovery()
|
||||
.data_dir(datadir.path())
|
||||
.args(["--max-outbound-peers", "0", "--max-inbound-peers", "0"])
|
||||
.spawn();
|
||||
|
||||
// Return the TempDir alongside the instance so it lives as long as the node.
|
||||
(instance, datadir)
|
||||
}
|
||||
|
||||
// ── Original tests (from PR #22069) ──────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn help() {
|
||||
let stdout = reth_ok(&["--help"]);
|
||||
assert!(stdout.contains("Usage"), "stdout: {stdout}");
|
||||
assert!(stdout.contains("node"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn version() {
|
||||
let stdout = reth_ok(&["--version"]);
|
||||
assert!(stdout.to_lowercase().contains("reth"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn node_help() {
|
||||
let stdout = reth_ok(&["node", "--help"]);
|
||||
assert!(stdout.contains("--dev"), "stdout: {stdout}");
|
||||
assert!(stdout.contains("--http"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unknown_subcommand() {
|
||||
let output = Command::new(RETH).arg("definitely-not-a-cmd").output().unwrap();
|
||||
assert!(!output.status.success());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unknown_flag() {
|
||||
let output = Command::new(RETH).args(["node", "--no-such-flag"]).output().unwrap();
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
assert!(!output.status.success());
|
||||
assert!(stderr.contains("--no-such-flag"), "stderr: {stderr}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn dev_node_eth_syncing() {
|
||||
use alloy_provider::{Provider, ProviderBuilder};
|
||||
|
||||
let (reth, _datadir) = spawn_dev();
|
||||
let provider = ProviderBuilder::new().connect_http(reth.endpoint().parse().unwrap());
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
|
||||
let _syncing = provider.syncing().await.expect("eth_syncing failed");
|
||||
}
|
||||
|
||||
// ── Subcommand --help coverage ───────────────────────────────────────────────
|
||||
//
|
||||
// Every registered subcommand must produce valid --help output. This catches
|
||||
// clap wiring regressions (e.g. a missing field, a conflicting arg name, or a
|
||||
// broken `help_message()` call) that would otherwise only surface when a user
|
||||
// runs the command.
|
||||
|
||||
#[test]
|
||||
fn init_help() {
|
||||
let stdout = reth_ok(&["init", "--help"]);
|
||||
assert!(stdout.contains("--chain"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn init_state_help() {
|
||||
let stdout = reth_ok(&["init-state", "--help"]);
|
||||
assert!(stdout.contains("--chain"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn import_help() {
|
||||
let stdout = reth_ok(&["import", "--help"]);
|
||||
assert!(stdout.contains("--chain"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn import_era_help() {
|
||||
let stdout = reth_ok(&["import-era", "--help"]);
|
||||
assert!(stdout.contains("--chain"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn export_era_help() {
|
||||
let stdout = reth_ok(&["export-era", "--help"]);
|
||||
assert!(stdout.contains("--chain"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn dump_genesis_help() {
|
||||
let stdout = reth_ok(&["dump-genesis", "--help"]);
|
||||
assert!(stdout.contains("--chain"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn db_help() {
|
||||
let stdout = reth_ok(&["db", "--help"]);
|
||||
assert!(stdout.contains("stats"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stage_help() {
|
||||
let stdout = reth_ok(&["stage", "--help"]);
|
||||
assert!(stdout.contains("run"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn p2p_help() {
|
||||
let stdout = reth_ok(&["p2p", "--help"]);
|
||||
assert!(stdout.contains("header"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn config_help() {
|
||||
let stdout = reth_ok(&["config", "--help"]);
|
||||
assert!(stdout.contains("--default"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn prune_help() {
|
||||
let stdout = reth_ok(&["prune", "--help"]);
|
||||
assert!(stdout.contains("--chain"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn download_help() {
|
||||
let stdout = reth_ok(&["download", "--help"]);
|
||||
assert!(stdout.contains("--chain"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn re_execute_help() {
|
||||
let stdout = reth_ok(&["re-execute", "--help"]);
|
||||
assert!(stdout.contains("--chain"), "stdout: {stdout}");
|
||||
}
|
||||
|
||||
// ── `config --default` outputs valid TOML ────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn config_default_valid_toml() {
|
||||
let stdout = reth_ok(&["config", "--default"]);
|
||||
|
||||
let parsed: toml::Value =
|
||||
toml::from_str(&stdout).expect("config --default did not produce valid TOML");
|
||||
|
||||
// The default config must contain the [stages] table — this is the heart of
|
||||
// the pipeline configuration and its absence would indicate a serialization
|
||||
// regression.
|
||||
assert!(parsed.get("stages").is_some(), "missing [stages] in config output");
|
||||
}
|
||||
|
||||
// ── `dump-genesis` outputs valid JSON ────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn dump_genesis_mainnet_valid_json() {
|
||||
let stdout = reth_ok(&["dump-genesis"]);
|
||||
|
||||
let genesis: serde_json::Value =
|
||||
serde_json::from_str(&stdout).expect("dump-genesis did not produce valid JSON");
|
||||
|
||||
assert!(genesis.get("nonce").is_some(), "missing nonce in genesis JSON");
|
||||
assert!(genesis.get("alloc").is_some(), "missing alloc in genesis JSON");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn dump_genesis_sepolia_valid_json() {
|
||||
let stdout = reth_ok(&["dump-genesis", "--chain", "sepolia"]);
|
||||
|
||||
let genesis: serde_json::Value = serde_json::from_str(&stdout)
|
||||
.expect("dump-genesis --chain sepolia did not produce valid JSON");
|
||||
|
||||
assert!(genesis.get("alloc").is_some(), "missing alloc in sepolia genesis JSON");
|
||||
}
|
||||
|
||||
// ── Dev node: send transaction round-trip ────────────────────────────────────
|
||||
//
|
||||
// Exercises the full pipeline: RPC submission → mempool → sealing → execution →
|
||||
// receipt retrieval. Uses the pre-funded dev account so no genesis customization
|
||||
// is required.
|
||||
|
||||
#[tokio::test]
|
||||
async fn dev_node_send_tx_and_mine() {
|
||||
use alloy_primitives::{Address, U256};
|
||||
use alloy_provider::{Provider, ProviderBuilder};
|
||||
use alloy_rpc_types_eth::TransactionRequest;
|
||||
|
||||
let (reth, _datadir) = spawn_dev();
|
||||
let provider = ProviderBuilder::new().connect_http(reth.endpoint().parse().unwrap());
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
|
||||
// Dev mode pre-funds the first dev account.
|
||||
let accounts = provider.get_accounts().await.expect("eth_accounts failed");
|
||||
assert!(!accounts.is_empty(), "dev node should expose at least one account");
|
||||
|
||||
let sender = accounts[0];
|
||||
let recipient = Address::with_last_byte(0x42);
|
||||
|
||||
let tx = TransactionRequest::default().from(sender).to(recipient).value(U256::from(1_000_000));
|
||||
|
||||
let tx_hash = provider.send_transaction(tx).await.expect("eth_sendTransaction failed");
|
||||
|
||||
// In dev/instant-mine mode the node seals a block for each transaction, so
|
||||
// the receipt becomes available almost immediately.
|
||||
let receipt = tx_hash.get_receipt().await.expect("failed to get receipt");
|
||||
|
||||
assert!(receipt.status(), "transaction should have succeeded");
|
||||
assert_eq!(receipt.to, Some(recipient));
|
||||
assert!(receipt.block_number.unwrap() > 0, "receipt should be in a mined block");
|
||||
|
||||
// Verify the transfer actually mutated state.
|
||||
let balance = provider.get_balance(recipient).await.expect("eth_getBalance failed");
|
||||
assert_eq!(balance, U256::from(1_000_000));
|
||||
}
|
||||
|
||||
const fn main() {}
|
||||
@@ -312,6 +312,11 @@ impl DeferredTrieData {
|
||||
/// Given that invariant, circular wait dependencies are impossible.
|
||||
#[instrument(level = "debug", target = "engine::tree::deferred_trie", skip_all)]
|
||||
pub fn wait_cloned(&self) -> ComputedTrieData {
|
||||
#[cfg(feature = "rayon")]
|
||||
debug_assert!(
|
||||
rayon::current_thread_index().is_none(),
|
||||
"wait_cloned must not be called from a rayon worker thread"
|
||||
);
|
||||
let mut state = self.state.lock();
|
||||
match &mut *state {
|
||||
// If the deferred trie data is ready, return the cached result.
|
||||
|
||||
@@ -1061,6 +1061,14 @@ mod tests {
|
||||
) -> ProviderResult<Option<StorageValue>> {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn storage_by_hashed_key(
|
||||
&self,
|
||||
_address: Address,
|
||||
_hashed_storage_key: StorageKey,
|
||||
) -> ProviderResult<Option<StorageValue>> {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
impl BytecodeReader for MockStateProvider {
|
||||
|
||||
@@ -223,6 +223,26 @@ impl<N: NodePrimitives> StateProvider for MemoryOverlayStateProviderRef<'_, N> {
|
||||
|
||||
self.historical.storage(address, storage_key)
|
||||
}
|
||||
|
||||
fn storage_by_hashed_key(
|
||||
&self,
|
||||
address: Address,
|
||||
hashed_storage_key: StorageKey,
|
||||
) -> ProviderResult<Option<StorageValue>> {
|
||||
let hashed_address = keccak256(address);
|
||||
let state = &self.trie_input().state;
|
||||
|
||||
if let Some(hs) = state.storages.get(&hashed_address) {
|
||||
if let Some(value) = hs.storage.get(&hashed_storage_key) {
|
||||
return Ok(Some(*value));
|
||||
}
|
||||
if hs.wiped {
|
||||
return Ok(Some(StorageValue::ZERO));
|
||||
}
|
||||
}
|
||||
|
||||
self.historical.storage_by_hashed_key(address, hashed_storage_key)
|
||||
}
|
||||
}
|
||||
|
||||
impl<N: NodePrimitives> BytecodeReader for MemoryOverlayStateProviderRef<'_, N> {
|
||||
|
||||
@@ -39,10 +39,7 @@ use reth_ethereum_forks::{
|
||||
ChainHardforks, DisplayHardforks, EthereumHardfork, EthereumHardforks, ForkCondition,
|
||||
ForkFilter, ForkFilterKey, ForkHash, ForkId, Hardfork, Hardforks, Head, DEV_HARDFORKS,
|
||||
};
|
||||
use reth_network_peers::{
|
||||
holesky_nodes, hoodi_nodes, mainnet_nodes, op_nodes, op_testnet_nodes, sepolia_nodes,
|
||||
NodeRecord,
|
||||
};
|
||||
use reth_network_peers::{holesky_nodes, hoodi_nodes, mainnet_nodes, sepolia_nodes, NodeRecord};
|
||||
use reth_primitives_traits::{sync::LazyLock, BlockHeader, SealedHeader};
|
||||
|
||||
/// Helper method building a [`Header`] given [`Genesis`] and [`ChainHardforks`].
|
||||
@@ -780,15 +777,6 @@ impl<H: BlockHeader> ChainSpec<H> {
|
||||
C::Sepolia => Some(sepolia_nodes()),
|
||||
C::Holesky => Some(holesky_nodes()),
|
||||
C::Hoodi => Some(hoodi_nodes()),
|
||||
// opstack uses the same bootnodes for all chains: <https://github.com/paradigmxyz/reth/issues/14603>
|
||||
C::Base | C::Optimism | C::Unichain | C::World => Some(op_nodes()),
|
||||
C::OptimismSepolia | C::BaseSepolia | C::UnichainSepolia | C::WorldSepolia => {
|
||||
Some(op_testnet_nodes())
|
||||
}
|
||||
|
||||
// fallback for optimism chains
|
||||
chain if chain.is_optimism() && chain.is_testnet() => Some(op_testnet_nodes()),
|
||||
chain if chain.is_optimism() => Some(op_nodes()),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,7 +66,8 @@ pub trait RethCli: Sized {
|
||||
F: FnOnce(Self, CliRunner) -> R,
|
||||
{
|
||||
let cli = Self::parse_args()?;
|
||||
let runner = CliRunner::try_default_runtime()?;
|
||||
let runner = CliRunner::try_default_runtime()
|
||||
.map_err(|e| Error::raw(clap::error::ErrorKind::Io, e))?;
|
||||
Ok(cli.with_runner(f, runner))
|
||||
}
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ use reth_node_builder::{
|
||||
Node, NodeComponents, NodeComponentsBuilder, NodeTypes, NodeTypesWithDBAdapter,
|
||||
};
|
||||
use reth_node_core::{
|
||||
args::{DatabaseArgs, DatadirArgs, RocksDbArgs, StaticFilesArgs, StorageArgs},
|
||||
args::{DatabaseArgs, DatadirArgs, StaticFilesArgs, StorageArgs},
|
||||
dirs::{ChainPath, DataDirPath},
|
||||
};
|
||||
use reth_provider::{
|
||||
@@ -67,70 +67,35 @@ pub struct EnvironmentArgs<C: ChainSpecParser> {
|
||||
#[command(flatten)]
|
||||
pub static_files: StaticFilesArgs,
|
||||
|
||||
/// All `RocksDB` related arguments
|
||||
#[command(flatten)]
|
||||
pub rocksdb: RocksDbArgs,
|
||||
|
||||
/// Storage mode configuration (v2 vs v1/legacy)
|
||||
#[command(flatten)]
|
||||
pub storage: StorageArgs,
|
||||
}
|
||||
|
||||
impl<C: ChainSpecParser> EnvironmentArgs<C> {
|
||||
/// Returns the effective storage settings derived from `--storage.v2`, static-file, and
|
||||
/// `RocksDB` CLI args.
|
||||
/// Returns the effective storage settings derived from `--storage.v2`.
|
||||
///
|
||||
/// The base storage mode is determined by `--storage.v2`:
|
||||
/// - When `--storage.v2` is set: uses [`StorageSettings::v2()`] defaults
|
||||
/// - Otherwise: uses [`StorageSettings::v1()`] defaults
|
||||
///
|
||||
/// Individual `--static-files.*` and `--rocksdb.*` flags override the base when explicitly set.
|
||||
/// - Otherwise: uses [`StorageSettings::base()`] defaults
|
||||
pub fn storage_settings(&self) -> StorageSettings {
|
||||
let mut s = if self.storage.v2 { StorageSettings::v2() } else { StorageSettings::base() };
|
||||
|
||||
// Apply static files overrides (only when explicitly set)
|
||||
if let Some(v) = self.static_files.receipts {
|
||||
s = s.with_receipts_in_static_files(v);
|
||||
if self.storage.v2 {
|
||||
StorageSettings::v2()
|
||||
} else {
|
||||
StorageSettings::base()
|
||||
}
|
||||
if let Some(v) = self.static_files.transaction_senders {
|
||||
s = s.with_transaction_senders_in_static_files(v);
|
||||
}
|
||||
if let Some(v) = self.static_files.account_changesets {
|
||||
s = s.with_account_changesets_in_static_files(v);
|
||||
}
|
||||
if let Some(v) = self.static_files.storage_changesets {
|
||||
s = s.with_storage_changesets_in_static_files(v);
|
||||
}
|
||||
|
||||
// Apply rocksdb overrides
|
||||
// --rocksdb.all sets all rocksdb flags to true
|
||||
if self.rocksdb.all {
|
||||
s = s
|
||||
.with_transaction_hash_numbers_in_rocksdb(true)
|
||||
.with_storages_history_in_rocksdb(true)
|
||||
.with_account_history_in_rocksdb(true);
|
||||
}
|
||||
|
||||
// Individual rocksdb flags override --rocksdb.all when explicitly set
|
||||
if let Some(v) = self.rocksdb.tx_hash {
|
||||
s = s.with_transaction_hash_numbers_in_rocksdb(v);
|
||||
}
|
||||
if let Some(v) = self.rocksdb.storages_history {
|
||||
s = s.with_storages_history_in_rocksdb(v);
|
||||
}
|
||||
if let Some(v) = self.rocksdb.account_history {
|
||||
s = s.with_account_history_in_rocksdb(v);
|
||||
}
|
||||
|
||||
s
|
||||
}
|
||||
|
||||
/// Initializes environment according to [`AccessRights`] and returns an instance of
|
||||
/// [`Environment`].
|
||||
///
|
||||
/// Internally builds a [`reth_tasks::Runtime`] attached to the current tokio handle for
|
||||
/// parallel storage I/O.
|
||||
pub fn init<N: CliNodeTypes>(&self, access: AccessRights) -> eyre::Result<Environment<N>>
|
||||
where
|
||||
C: ChainSpecParser<ChainSpec = N::ChainSpec>,
|
||||
{
|
||||
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
|
||||
let data_dir = self.datadir.clone().resolve_datadir(self.chain.chain());
|
||||
let db_path = data_dir.db();
|
||||
let sf_path = data_dir.static_files();
|
||||
@@ -186,7 +151,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
|
||||
.build()?;
|
||||
|
||||
let provider_factory =
|
||||
self.create_provider_factory(&config, db, sfp, rocksdb_provider, access)?;
|
||||
self.create_provider_factory(&config, db, sfp, rocksdb_provider, access, runtime)?;
|
||||
if access.is_read_write() {
|
||||
debug!(target: "reth::cli", chain=%self.chain.chain(), genesis=?self.chain.genesis_hash(), "Initializing genesis");
|
||||
init_genesis_with_settings(&provider_factory, self.storage_settings())?;
|
||||
@@ -207,6 +172,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
|
||||
static_file_provider: StaticFileProvider<N::Primitives>,
|
||||
rocksdb_provider: RocksDBProvider,
|
||||
access: AccessRights,
|
||||
runtime: reth_tasks::Runtime,
|
||||
) -> eyre::Result<ProviderFactory<NodeTypesWithDBAdapter<N, DatabaseEnv>>>
|
||||
where
|
||||
C: ChainSpecParser<ChainSpec = N::ChainSpec>,
|
||||
@@ -217,6 +183,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
|
||||
self.chain.clone(),
|
||||
static_file_provider,
|
||||
rocksdb_provider,
|
||||
runtime,
|
||||
)?
|
||||
.with_prune_modes(prune_modes.clone());
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ use reth_codecs::Compact;
|
||||
use reth_db_api::{cursor::DbDupCursorRO, database::Database, tables, transaction::DbTx};
|
||||
use reth_db_common::DbTool;
|
||||
use reth_node_builder::NodeTypesWithDB;
|
||||
use reth_storage_api::StorageSettingsCache;
|
||||
use std::time::{Duration, Instant};
|
||||
use tracing::info;
|
||||
|
||||
@@ -22,52 +23,94 @@ impl Command {
|
||||
/// Execute `db account-storage` command
|
||||
pub fn execute<N: NodeTypesWithDB>(self, tool: &DbTool<N>) -> eyre::Result<()> {
|
||||
let address = self.address;
|
||||
let (slot_count, plain_size) = tool.provider_factory.db_ref().view(|tx| {
|
||||
let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
|
||||
let mut count = 0usize;
|
||||
let mut total_value_bytes = 0usize;
|
||||
let mut last_log = Instant::now();
|
||||
let use_hashed_state = tool.provider_factory.cached_storage_settings().use_hashed_state();
|
||||
|
||||
// Walk all storage entries for this address
|
||||
let walker = cursor.walk_dup(Some(address), None)?;
|
||||
for entry in walker {
|
||||
let (_, storage_entry) = entry?;
|
||||
count += 1;
|
||||
// StorageEntry encodes as: 32 bytes (key/subkey uncompressed) + compressed U256
|
||||
let mut buf = Vec::new();
|
||||
let entry_len = storage_entry.to_compact(&mut buf);
|
||||
total_value_bytes += entry_len;
|
||||
let (slot_count, storage_size) = if use_hashed_state {
|
||||
let hashed_address = keccak256(address);
|
||||
tool.provider_factory.db_ref().view(|tx| {
|
||||
let mut cursor = tx.cursor_dup_read::<tables::HashedStorages>()?;
|
||||
let mut count = 0usize;
|
||||
let mut total_value_bytes = 0usize;
|
||||
let mut last_log = Instant::now();
|
||||
|
||||
if last_log.elapsed() >= LOG_INTERVAL {
|
||||
info!(
|
||||
target: "reth::cli",
|
||||
address = %address,
|
||||
slots = count,
|
||||
key = %storage_entry.key,
|
||||
"Processing storage slots"
|
||||
);
|
||||
last_log = Instant::now();
|
||||
let walker = cursor.walk_dup(Some(hashed_address), None)?;
|
||||
for entry in walker {
|
||||
let (_, storage_entry) = entry?;
|
||||
count += 1;
|
||||
let mut buf = Vec::new();
|
||||
let entry_len = storage_entry.to_compact(&mut buf);
|
||||
total_value_bytes += entry_len;
|
||||
|
||||
if last_log.elapsed() >= LOG_INTERVAL {
|
||||
info!(
|
||||
target: "reth::cli",
|
||||
address = %address,
|
||||
slots = count,
|
||||
key = %storage_entry.key,
|
||||
"Processing hashed storage slots"
|
||||
);
|
||||
last_log = Instant::now();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add 20 bytes for the Address key (stored once per account in dupsort)
|
||||
let total_size = if count > 0 { 20 + total_value_bytes } else { 0 };
|
||||
let total_size = if count > 0 { 32 + total_value_bytes } else { 0 };
|
||||
|
||||
Ok::<_, eyre::Report>((count, total_size))
|
||||
})??;
|
||||
Ok::<_, eyre::Report>((count, total_size))
|
||||
})??
|
||||
} else {
|
||||
tool.provider_factory.db_ref().view(|tx| {
|
||||
let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
|
||||
let mut count = 0usize;
|
||||
let mut total_value_bytes = 0usize;
|
||||
let mut last_log = Instant::now();
|
||||
|
||||
// Estimate hashed storage size: 32-byte B256 key instead of 20-byte Address
|
||||
let hashed_size_estimate = if slot_count > 0 { plain_size + 12 } else { 0 };
|
||||
let total_estimate = plain_size + hashed_size_estimate;
|
||||
// Walk all storage entries for this address
|
||||
let walker = cursor.walk_dup(Some(address), None)?;
|
||||
for entry in walker {
|
||||
let (_, storage_entry) = entry?;
|
||||
count += 1;
|
||||
let mut buf = Vec::new();
|
||||
// StorageEntry encodes as: 32 bytes (key/subkey uncompressed) + compressed U256
|
||||
let entry_len = storage_entry.to_compact(&mut buf);
|
||||
total_value_bytes += entry_len;
|
||||
|
||||
if last_log.elapsed() >= LOG_INTERVAL {
|
||||
info!(
|
||||
target: "reth::cli",
|
||||
address = %address,
|
||||
slots = count,
|
||||
key = %storage_entry.key,
|
||||
"Processing storage slots"
|
||||
);
|
||||
last_log = Instant::now();
|
||||
}
|
||||
}
|
||||
|
||||
// Add 20 bytes for the Address key (stored once per account in dupsort)
|
||||
let total_size = if count > 0 { 20 + total_value_bytes } else { 0 };
|
||||
|
||||
Ok::<_, eyre::Report>((count, total_size))
|
||||
})??
|
||||
};
|
||||
|
||||
let hashed_address = keccak256(address);
|
||||
|
||||
println!("Account: {address}");
|
||||
println!("Hashed address: {hashed_address}");
|
||||
println!("Storage slots: {slot_count}");
|
||||
println!("Plain storage size: {} (estimated)", human_bytes(plain_size as f64));
|
||||
println!("Hashed storage size: {} (estimated)", human_bytes(hashed_size_estimate as f64));
|
||||
println!("Total estimated size: {}", human_bytes(total_estimate as f64));
|
||||
if use_hashed_state {
|
||||
println!("Hashed storage size: {} (estimated)", human_bytes(storage_size as f64));
|
||||
} else {
|
||||
// Estimate hashed storage size: 32-byte B256 key instead of 20-byte Address
|
||||
let hashed_size_estimate = if slot_count > 0 { storage_size + 12 } else { 0 };
|
||||
let total_estimate = storage_size + hashed_size_estimate;
|
||||
println!("Plain storage size: {} (estimated)", human_bytes(storage_size as f64));
|
||||
println!(
|
||||
"Hashed storage size: {} (estimated)",
|
||||
human_bytes(hashed_size_estimate as f64)
|
||||
);
|
||||
println!("Total estimated size: {}", human_bytes(total_estimate as f64));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
61
crates/cli/commands/src/db/copy.rs
Normal file
61
crates/cli/commands/src/db/copy.rs
Normal file
@@ -0,0 +1,61 @@
|
||||
use clap::Parser;
|
||||
use reth_db::mdbx::{self, ffi};
|
||||
use std::path::PathBuf;
|
||||
|
||||
/// Copies the MDBX database to a new location.
|
||||
///
|
||||
/// Equivalent to the standalone `mdbx_copy` tool but bundled into reth.
|
||||
#[derive(Parser, Debug)]
|
||||
pub struct Command {
|
||||
/// Destination path for the database copy.
|
||||
dest: PathBuf,
|
||||
|
||||
/// Compact the database while copying (reclaims free space).
|
||||
#[arg(short, long)]
|
||||
compact: bool,
|
||||
|
||||
/// Force dynamic size for the destination database.
|
||||
#[arg(short = 'd', long)]
|
||||
force_dynamic_size: bool,
|
||||
|
||||
/// Throttle to avoid MVCC pressure on writers.
|
||||
#[arg(short = 'p', long)]
|
||||
throttle_mvcc: bool,
|
||||
}
|
||||
|
||||
impl Command {
|
||||
/// Execute `db copy` command
|
||||
pub fn execute(self, db: &mdbx::DatabaseEnv) -> eyre::Result<()> {
|
||||
let mut flags: ffi::MDBX_copy_flags_t = ffi::MDBX_CP_DEFAULTS;
|
||||
if self.compact {
|
||||
flags |= ffi::MDBX_CP_COMPACT;
|
||||
}
|
||||
if self.force_dynamic_size {
|
||||
flags |= ffi::MDBX_CP_FORCE_DYNAMIC_SIZE;
|
||||
}
|
||||
if self.throttle_mvcc {
|
||||
flags |= ffi::MDBX_CP_THROTTLE_MVCC;
|
||||
}
|
||||
|
||||
let dest = self
|
||||
.dest
|
||||
.to_str()
|
||||
.ok_or_else(|| eyre::eyre!("destination path must be valid UTF-8"))?;
|
||||
let dest_cstr = std::ffi::CString::new(dest)?;
|
||||
|
||||
println!("Copying database to {} ...", self.dest.display());
|
||||
|
||||
let rc = db.with_raw_env_ptr(|env_ptr| unsafe {
|
||||
ffi::mdbx_env_copy(env_ptr, dest_cstr.as_ptr(), flags)
|
||||
});
|
||||
|
||||
if rc != 0 {
|
||||
eyre::bail!("mdbx_env_copy failed with error code {rc}: {}", unsafe {
|
||||
std::ffi::CStr::from_ptr(ffi::mdbx_strerror(rc)).to_string_lossy()
|
||||
});
|
||||
}
|
||||
|
||||
println!("Done.");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -98,7 +98,8 @@ impl Command {
|
||||
)?;
|
||||
|
||||
if let Some(entry) = entry {
|
||||
println!("{}", serde_json::to_string_pretty(&entry)?);
|
||||
let se: reth_primitives_traits::StorageEntry = entry.into();
|
||||
println!("{}", serde_json::to_string_pretty(&se)?);
|
||||
} else {
|
||||
error!(target: "reth::cli", "No content for the given table key.");
|
||||
}
|
||||
@@ -106,7 +107,14 @@ impl Command {
|
||||
}
|
||||
|
||||
let changesets = provider.storage_changeset(key.block_number())?;
|
||||
println!("{}", serde_json::to_string_pretty(&changesets)?);
|
||||
let serializable: Vec<_> = changesets
|
||||
.into_iter()
|
||||
.map(|(addr, entry)| {
|
||||
let se: reth_primitives_traits::StorageEntry = entry.into();
|
||||
(addr, se)
|
||||
})
|
||||
.collect();
|
||||
println!("{}", serde_json::to_string_pretty(&serializable)?);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ use std::{
|
||||
mod account_storage;
|
||||
mod checksum;
|
||||
mod clear;
|
||||
mod copy;
|
||||
mod diff;
|
||||
mod get;
|
||||
mod list;
|
||||
@@ -42,6 +43,8 @@ pub enum Subcommands {
|
||||
List(list::Command),
|
||||
/// Calculates the content checksum of a table or static file segment
|
||||
Checksum(checksum::Command),
|
||||
/// Copies the MDBX database to a new location (bundled mdbx_copy)
|
||||
Copy(copy::Command),
|
||||
/// Create a diff between two database tables or two entire databases.
|
||||
Diff(diff::Command),
|
||||
/// Gets the content of a table for the given key
|
||||
@@ -70,23 +73,23 @@ pub enum Subcommands {
|
||||
State(state::Command),
|
||||
}
|
||||
|
||||
/// Initializes a provider factory with specified access rights, and then execute with the provided
|
||||
/// command
|
||||
macro_rules! db_exec {
|
||||
($env:expr, $tool:ident, $N:ident, $access_rights:expr, $command:block) => {
|
||||
let Environment { provider_factory, .. } = $env.init::<$N>($access_rights)?;
|
||||
|
||||
let $tool = DbTool::new(provider_factory)?;
|
||||
$command;
|
||||
};
|
||||
}
|
||||
|
||||
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C> {
|
||||
/// Execute `db` command
|
||||
pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(
|
||||
self,
|
||||
ctx: CliContext,
|
||||
) -> eyre::Result<()> {
|
||||
/// Initializes a provider factory with specified access rights, and then executes the
|
||||
/// provided command.
|
||||
macro_rules! db_exec {
|
||||
($env:expr, $tool:ident, $N:ident, $access_rights:expr, $command:block) => {
|
||||
let Environment { provider_factory, .. } = $env.init::<$N>($access_rights)?;
|
||||
|
||||
let $tool = DbTool::new(provider_factory)?;
|
||||
$command;
|
||||
};
|
||||
}
|
||||
|
||||
let data_dir = self.env.datadir.clone().resolve_datadir(self.env.chain.chain());
|
||||
let db_path = data_dir.db();
|
||||
let static_files_path = data_dir.static_files();
|
||||
@@ -124,6 +127,11 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
|
||||
command.execute(&tool)?;
|
||||
});
|
||||
}
|
||||
Subcommands::Copy(command) => {
|
||||
db_exec!(self.env, tool, N, AccessRights::RO, {
|
||||
command.execute(tool.provider_factory.db_ref())?;
|
||||
});
|
||||
}
|
||||
Subcommands::Diff(command) => {
|
||||
db_exec!(self.env, tool, N, AccessRights::RO, {
|
||||
command.execute(&tool)?;
|
||||
|
||||
@@ -64,7 +64,7 @@ impl Command {
|
||||
let executor = task_executor.clone();
|
||||
let pprof_dump_dir = data_dir.pprof_dumps();
|
||||
|
||||
let handle = task_executor.spawn_critical("metrics server", async move {
|
||||
let handle = task_executor.spawn_critical_task("metrics server", async move {
|
||||
let config = MetricServerConfig::new(
|
||||
listen_addr,
|
||||
VersionInfo {
|
||||
|
||||
@@ -39,38 +39,12 @@ enum Subcommands {
|
||||
#[derive(Debug, Clone, Copy, Subcommand)]
|
||||
#[clap(rename_all = "snake_case")]
|
||||
pub enum SetCommand {
|
||||
/// Store receipts in static files instead of the database
|
||||
Receipts {
|
||||
#[clap(action(ArgAction::Set))]
|
||||
value: bool,
|
||||
},
|
||||
/// Store transaction senders in static files instead of the database
|
||||
TransactionSenders {
|
||||
#[clap(action(ArgAction::Set))]
|
||||
value: bool,
|
||||
},
|
||||
/// Store account changesets in static files instead of the database
|
||||
AccountChangesets {
|
||||
#[clap(action(ArgAction::Set))]
|
||||
value: bool,
|
||||
},
|
||||
/// Store storage history in rocksdb instead of MDBX
|
||||
StoragesHistory {
|
||||
#[clap(action(ArgAction::Set))]
|
||||
value: bool,
|
||||
},
|
||||
/// Store transaction hash to number mapping in rocksdb instead of MDBX
|
||||
TransactionHashNumbers {
|
||||
#[clap(action(ArgAction::Set))]
|
||||
value: bool,
|
||||
},
|
||||
/// Store account history in rocksdb instead of MDBX
|
||||
AccountHistory {
|
||||
#[clap(action(ArgAction::Set))]
|
||||
value: bool,
|
||||
},
|
||||
/// Store storage changesets in static files instead of the database
|
||||
StorageChangesets {
|
||||
/// Enable or disable v2 storage layout
|
||||
///
|
||||
/// When enabled, uses static files for receipts/senders/changesets and RocksDB for
|
||||
/// history indices and transaction hashes. When disabled, uses v1/legacy layout (everything in
|
||||
/// MDBX).
|
||||
V2 {
|
||||
#[clap(action(ArgAction::Set))]
|
||||
value: bool,
|
||||
},
|
||||
@@ -113,73 +87,18 @@ impl Command {
|
||||
println!("No storage settings found, creating new settings.");
|
||||
}
|
||||
|
||||
let mut settings @ StorageSettings {
|
||||
receipts_in_static_files: _,
|
||||
transaction_senders_in_static_files: _,
|
||||
storages_history_in_rocksdb: _,
|
||||
transaction_hash_numbers_in_rocksdb: _,
|
||||
account_history_in_rocksdb: _,
|
||||
account_changesets_in_static_files: _,
|
||||
storage_changesets_in_static_files: _,
|
||||
} = settings.unwrap_or_else(StorageSettings::v1);
|
||||
let mut settings @ StorageSettings { storage_v2: _ } =
|
||||
settings.unwrap_or_else(StorageSettings::v1);
|
||||
|
||||
// Update the setting based on the key
|
||||
match cmd {
|
||||
SetCommand::Receipts { value } => {
|
||||
if settings.receipts_in_static_files == value {
|
||||
println!("receipts_in_static_files is already set to {}", value);
|
||||
SetCommand::V2 { value } => {
|
||||
if settings.storage_v2 == value {
|
||||
println!("storage_v2 is already set to {}", value);
|
||||
return Ok(());
|
||||
}
|
||||
settings.receipts_in_static_files = value;
|
||||
println!("Set receipts_in_static_files = {}", value);
|
||||
}
|
||||
SetCommand::TransactionSenders { value } => {
|
||||
if settings.transaction_senders_in_static_files == value {
|
||||
println!("transaction_senders_in_static_files is already set to {}", value);
|
||||
return Ok(());
|
||||
}
|
||||
settings.transaction_senders_in_static_files = value;
|
||||
println!("Set transaction_senders_in_static_files = {}", value);
|
||||
}
|
||||
SetCommand::AccountChangesets { value } => {
|
||||
if settings.account_changesets_in_static_files == value {
|
||||
println!("account_changesets_in_static_files is already set to {}", value);
|
||||
return Ok(());
|
||||
}
|
||||
settings.account_changesets_in_static_files = value;
|
||||
println!("Set account_changesets_in_static_files = {}", value);
|
||||
}
|
||||
SetCommand::StoragesHistory { value } => {
|
||||
if settings.storages_history_in_rocksdb == value {
|
||||
println!("storages_history_in_rocksdb is already set to {}", value);
|
||||
return Ok(());
|
||||
}
|
||||
settings.storages_history_in_rocksdb = value;
|
||||
println!("Set storages_history_in_rocksdb = {}", value);
|
||||
}
|
||||
SetCommand::TransactionHashNumbers { value } => {
|
||||
if settings.transaction_hash_numbers_in_rocksdb == value {
|
||||
println!("transaction_hash_numbers_in_rocksdb is already set to {}", value);
|
||||
return Ok(());
|
||||
}
|
||||
settings.transaction_hash_numbers_in_rocksdb = value;
|
||||
println!("Set transaction_hash_numbers_in_rocksdb = {}", value);
|
||||
}
|
||||
SetCommand::AccountHistory { value } => {
|
||||
if settings.account_history_in_rocksdb == value {
|
||||
println!("account_history_in_rocksdb is already set to {}", value);
|
||||
return Ok(());
|
||||
}
|
||||
settings.account_history_in_rocksdb = value;
|
||||
println!("Set account_history_in_rocksdb = {}", value);
|
||||
}
|
||||
SetCommand::StorageChangesets { value } => {
|
||||
if settings.storage_changesets_in_static_files == value {
|
||||
println!("storage_changesets_in_static_files is already set to {}", value);
|
||||
return Ok(());
|
||||
}
|
||||
settings.storage_changesets_in_static_files = value;
|
||||
println!("Set storage_changesets_in_static_files = {}", value);
|
||||
settings.storage_v2 = value;
|
||||
println!("Set storage_v2 = {}", value);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use alloy_primitives::{Address, BlockNumber, B256, U256};
|
||||
use alloy_primitives::{keccak256, Address, BlockNumber, B256, U256};
|
||||
use clap::Parser;
|
||||
use parking_lot::Mutex;
|
||||
use reth_db_api::{
|
||||
@@ -63,39 +63,65 @@ impl Command {
|
||||
address: Address,
|
||||
limit: usize,
|
||||
) -> eyre::Result<()> {
|
||||
let use_hashed_state = tool.provider_factory.cached_storage_settings().use_hashed_state();
|
||||
|
||||
let entries = tool.provider_factory.db_ref().view(|tx| {
|
||||
// Get account info
|
||||
let account = tx.get::<tables::PlainAccountState>(address)?;
|
||||
|
||||
// Get storage entries
|
||||
let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
|
||||
let mut entries = Vec::new();
|
||||
let mut last_log = Instant::now();
|
||||
|
||||
let walker = cursor.walk_dup(Some(address), None)?;
|
||||
for (idx, entry) in walker.enumerate() {
|
||||
let (_, storage_entry) = entry?;
|
||||
|
||||
if storage_entry.value != U256::ZERO {
|
||||
entries.push((storage_entry.key, storage_entry.value));
|
||||
let (account, walker_entries) = if use_hashed_state {
|
||||
let hashed_address = keccak256(address);
|
||||
let account = tx.get::<tables::HashedAccounts>(hashed_address)?;
|
||||
let mut cursor = tx.cursor_dup_read::<tables::HashedStorages>()?;
|
||||
let walker = cursor.walk_dup(Some(hashed_address), None)?;
|
||||
let mut entries = Vec::new();
|
||||
let mut last_log = Instant::now();
|
||||
for (idx, entry) in walker.enumerate() {
|
||||
let (_, storage_entry) = entry?;
|
||||
if storage_entry.value != U256::ZERO {
|
||||
entries.push((storage_entry.key, storage_entry.value));
|
||||
}
|
||||
if entries.len() >= limit {
|
||||
break;
|
||||
}
|
||||
if last_log.elapsed() >= LOG_INTERVAL {
|
||||
info!(
|
||||
target: "reth::cli",
|
||||
address = %address,
|
||||
slots_scanned = idx,
|
||||
"Scanning storage slots"
|
||||
);
|
||||
last_log = Instant::now();
|
||||
}
|
||||
}
|
||||
|
||||
if entries.len() >= limit {
|
||||
break;
|
||||
(account, entries)
|
||||
} else {
|
||||
// Get account info
|
||||
let account = tx.get::<tables::PlainAccountState>(address)?;
|
||||
// Get storage entries
|
||||
let mut cursor = tx.cursor_dup_read::<tables::PlainStorageState>()?;
|
||||
let walker = cursor.walk_dup(Some(address), None)?;
|
||||
let mut entries = Vec::new();
|
||||
let mut last_log = Instant::now();
|
||||
for (idx, entry) in walker.enumerate() {
|
||||
let (_, storage_entry) = entry?;
|
||||
if storage_entry.value != U256::ZERO {
|
||||
entries.push((storage_entry.key, storage_entry.value));
|
||||
}
|
||||
if entries.len() >= limit {
|
||||
break;
|
||||
}
|
||||
if last_log.elapsed() >= LOG_INTERVAL {
|
||||
info!(
|
||||
target: "reth::cli",
|
||||
address = %address,
|
||||
slots_scanned = idx,
|
||||
"Scanning storage slots"
|
||||
);
|
||||
last_log = Instant::now();
|
||||
}
|
||||
}
|
||||
(account, entries)
|
||||
};
|
||||
|
||||
if last_log.elapsed() >= LOG_INTERVAL {
|
||||
info!(
|
||||
target: "reth::cli",
|
||||
address = %address,
|
||||
slots_scanned = idx,
|
||||
"Scanning storage slots"
|
||||
);
|
||||
last_log = Instant::now();
|
||||
}
|
||||
}
|
||||
|
||||
Ok::<_, eyre::Report>((account, entries))
|
||||
Ok::<_, eyre::Report>((account, walker_entries))
|
||||
})??;
|
||||
|
||||
let (account, storage_entries) = entries;
|
||||
@@ -119,7 +145,7 @@ impl Command {
|
||||
|
||||
// Check storage settings to determine where history is stored
|
||||
let storage_settings = tool.provider_factory.cached_storage_settings();
|
||||
let history_in_rocksdb = storage_settings.storages_history_in_rocksdb;
|
||||
let history_in_rocksdb = storage_settings.storage_v2;
|
||||
|
||||
// For historical queries, enumerate keys from history indices only
|
||||
// (not PlainStorageState, which reflects current state)
|
||||
|
||||
@@ -37,6 +37,14 @@ pub struct DownloadDefaults {
|
||||
pub available_snapshots: Vec<Cow<'static, str>>,
|
||||
/// Default base URL for snapshots
|
||||
pub default_base_url: Cow<'static, str>,
|
||||
/// Default base URL for chain-aware snapshots.
|
||||
///
|
||||
/// When set, the chain ID is appended to form the full URL: `{base_url}/{chain_id}`.
|
||||
/// For example, given a base URL of `https://snapshots.example.com` and chain ID `1`,
|
||||
/// the resulting URL would be `https://snapshots.example.com/1`.
|
||||
///
|
||||
/// Falls back to [`default_base_url`](Self::default_base_url) when `None`.
|
||||
pub default_chain_aware_base_url: Option<Cow<'static, str>>,
|
||||
/// Optional custom long help text that overrides the generated help
|
||||
pub long_help: Option<String>,
|
||||
}
|
||||
@@ -60,6 +68,7 @@ impl DownloadDefaults {
|
||||
Cow::Borrowed("https://publicnode.com/snapshots (full nodes & testnets)"),
|
||||
],
|
||||
default_base_url: Cow::Borrowed(MERKLE_BASE_URL),
|
||||
default_chain_aware_base_url: None,
|
||||
long_help: None,
|
||||
}
|
||||
}
|
||||
@@ -84,9 +93,11 @@ impl DownloadDefaults {
|
||||
}
|
||||
|
||||
help.push_str(
|
||||
"\nIf no URL is provided, the latest mainnet archive snapshot\nwill be proposed for download from ",
|
||||
"\nIf no URL is provided, the latest archive snapshot for the selected chain\nwill be proposed for download from ",
|
||||
);
|
||||
help.push_str(
|
||||
self.default_chain_aware_base_url.as_deref().unwrap_or(&self.default_base_url),
|
||||
);
|
||||
help.push_str(self.default_base_url.as_ref());
|
||||
help.push_str(
|
||||
".\n\nLocal file:// URLs are also supported for extracting snapshots from disk.",
|
||||
);
|
||||
@@ -111,6 +122,12 @@ impl DownloadDefaults {
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the default chain-aware base URL.
|
||||
pub fn with_chain_aware_base_url(mut self, url: impl Into<Cow<'static, str>>) -> Self {
|
||||
self.default_chain_aware_base_url = Some(url.into());
|
||||
self
|
||||
}
|
||||
|
||||
/// Builder: Set custom long help text, overriding the generated help
|
||||
pub fn with_long_help(mut self, help: impl Into<String>) -> Self {
|
||||
self.long_help = Some(help.into());
|
||||
@@ -142,7 +159,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
|
||||
let url = match self.url {
|
||||
Some(url) => url,
|
||||
None => {
|
||||
let url = get_latest_snapshot_url().await?;
|
||||
let url = get_latest_snapshot_url(self.env.chain.chain().id()).await?;
|
||||
info!(target: "reth::cli", "Using default snapshot URL: {}", url);
|
||||
url
|
||||
}
|
||||
@@ -509,8 +526,12 @@ async fn stream_and_extract(url: &str, target_dir: &Path) -> Result<()> {
|
||||
}
|
||||
|
||||
// Builds default URL for latest mainnet archive snapshot using configured defaults
|
||||
async fn get_latest_snapshot_url() -> Result<String> {
|
||||
let base_url = &DownloadDefaults::get_global().default_base_url;
|
||||
async fn get_latest_snapshot_url(chain_id: u64) -> Result<String> {
|
||||
let defaults = DownloadDefaults::get_global();
|
||||
let base_url = match &defaults.default_chain_aware_base_url {
|
||||
Some(url) => format!("{url}/{chain_id}"),
|
||||
None => defaults.default_base_url.to_string(),
|
||||
};
|
||||
let latest_url = format!("{base_url}/latest.txt");
|
||||
let filename = Client::new()
|
||||
.get(latest_url)
|
||||
|
||||
@@ -10,8 +10,8 @@ use reth_node_builder::NodeBuilder;
|
||||
use reth_node_core::{
|
||||
args::{
|
||||
DatabaseArgs, DatadirArgs, DebugArgs, DevArgs, EngineArgs, EraArgs, MetricArgs,
|
||||
NetworkArgs, PayloadBuilderArgs, PruningArgs, RocksDbArgs, RpcServerArgs, StaticFilesArgs,
|
||||
StorageArgs, TxPoolArgs,
|
||||
NetworkArgs, PayloadBuilderArgs, PruningArgs, RpcServerArgs, StaticFilesArgs, StorageArgs,
|
||||
TxPoolArgs,
|
||||
},
|
||||
node_config::NodeConfig,
|
||||
version,
|
||||
@@ -103,10 +103,6 @@ pub struct NodeCommand<C: ChainSpecParser, Ext: clap::Args + fmt::Debug = NoArgs
|
||||
#[command(flatten)]
|
||||
pub pruning: PruningArgs,
|
||||
|
||||
/// All `RocksDB` table routing arguments
|
||||
#[command(flatten)]
|
||||
pub rocksdb: RocksDbArgs,
|
||||
|
||||
/// Engine cli arguments
|
||||
#[command(flatten, next_help_heading = "Engine")]
|
||||
pub engine: EngineArgs,
|
||||
@@ -119,8 +115,8 @@ pub struct NodeCommand<C: ChainSpecParser, Ext: clap::Args + fmt::Debug = NoArgs
|
||||
#[command(flatten, next_help_heading = "Static Files")]
|
||||
pub static_files: StaticFilesArgs,
|
||||
|
||||
/// Storage mode configuration (v2 vs v1/legacy)
|
||||
#[command(flatten)]
|
||||
/// All storage related arguments with --storage prefix
|
||||
#[command(flatten, next_help_heading = "Storage")]
|
||||
pub storage: StorageArgs,
|
||||
|
||||
/// Additional cli arguments
|
||||
@@ -175,7 +171,6 @@ where
|
||||
db,
|
||||
dev,
|
||||
pruning,
|
||||
rocksdb,
|
||||
engine,
|
||||
era,
|
||||
static_files,
|
||||
@@ -183,9 +178,6 @@ where
|
||||
ext,
|
||||
} = self;
|
||||
|
||||
// Validate RocksDB arguments
|
||||
rocksdb.validate()?;
|
||||
|
||||
// set up node config
|
||||
let mut node_config = NodeConfig {
|
||||
datadir,
|
||||
@@ -201,7 +193,6 @@ where
|
||||
db,
|
||||
dev,
|
||||
pruning,
|
||||
rocksdb,
|
||||
engine,
|
||||
era,
|
||||
static_files,
|
||||
|
||||
@@ -76,7 +76,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> PruneComma
|
||||
// Set up cancellation token for graceful shutdown on Ctrl+C
|
||||
let cancellation = CancellationToken::new();
|
||||
let cancellation_clone = cancellation.clone();
|
||||
ctx.task_executor.spawn_critical("prune-ctrl-c", async move {
|
||||
ctx.task_executor.spawn_critical_task("prune-ctrl-c", async move {
|
||||
tokio::signal::ctrl_c().await.expect("failed to listen for ctrl-c");
|
||||
cancellation_clone.cancel();
|
||||
});
|
||||
|
||||
@@ -9,7 +9,10 @@ use reth_db_api::{
|
||||
transaction::{DbTx, DbTxMut},
|
||||
};
|
||||
use reth_db_common::{
|
||||
init::{insert_genesis_header, insert_genesis_history, insert_genesis_state},
|
||||
init::{
|
||||
insert_genesis_account_history, insert_genesis_header, insert_genesis_state,
|
||||
insert_genesis_storage_history,
|
||||
},
|
||||
DbTool,
|
||||
};
|
||||
use reth_node_api::{HeaderTy, ReceiptTy, TxTy};
|
||||
@@ -42,12 +45,16 @@ impl<C: ChainSpecParser> Command<C> {
|
||||
|
||||
let tool = DbTool::new(provider_factory)?;
|
||||
|
||||
let static_file_segment = match self.stage {
|
||||
StageEnum::Headers => Some(StaticFileSegment::Headers),
|
||||
StageEnum::Bodies => Some(StaticFileSegment::Transactions),
|
||||
StageEnum::Execution => Some(StaticFileSegment::Receipts),
|
||||
StageEnum::Senders => Some(StaticFileSegment::TransactionSenders),
|
||||
_ => None,
|
||||
let static_file_segments = match self.stage {
|
||||
StageEnum::Headers => vec![StaticFileSegment::Headers],
|
||||
StageEnum::Bodies => vec![StaticFileSegment::Transactions],
|
||||
StageEnum::Execution => vec![
|
||||
StaticFileSegment::Receipts,
|
||||
StaticFileSegment::AccountChangeSets,
|
||||
StaticFileSegment::StorageChangeSets,
|
||||
],
|
||||
StageEnum::Senders => vec![StaticFileSegment::TransactionSenders],
|
||||
_ => vec![],
|
||||
};
|
||||
|
||||
// Calling `StaticFileProviderRW::prune_*` will instruct the writer to prune rows only
|
||||
@@ -55,35 +62,33 @@ impl<C: ChainSpecParser> Command<C> {
|
||||
// deleting the jar files, otherwise if the task were to be interrupted after we
|
||||
// have deleted them, BUT before we have committed the checkpoints to the database, we'd
|
||||
// lose essential data.
|
||||
if let Some(static_file_segment) = static_file_segment {
|
||||
let static_file_provider = tool.provider_factory.static_file_provider();
|
||||
if let Some(highest_block) =
|
||||
static_file_provider.get_highest_static_file_block(static_file_segment)
|
||||
let static_file_provider = tool.provider_factory.static_file_provider();
|
||||
for segment in static_file_segments {
|
||||
if let Some(highest_block) = static_file_provider.get_highest_static_file_block(segment)
|
||||
{
|
||||
let mut writer = static_file_provider.latest_writer(static_file_segment)?;
|
||||
let mut writer = static_file_provider.latest_writer(segment)?;
|
||||
|
||||
match static_file_segment {
|
||||
match segment {
|
||||
StaticFileSegment::Headers => {
|
||||
// Prune all headers leaving genesis intact.
|
||||
writer.prune_headers(highest_block)?;
|
||||
}
|
||||
StaticFileSegment::Transactions => {
|
||||
let to_delete = static_file_provider
|
||||
.get_highest_static_file_tx(static_file_segment)
|
||||
.get_highest_static_file_tx(segment)
|
||||
.map(|tx_num| tx_num + 1)
|
||||
.unwrap_or_default();
|
||||
writer.prune_transactions(to_delete, 0)?;
|
||||
}
|
||||
StaticFileSegment::Receipts => {
|
||||
let to_delete = static_file_provider
|
||||
.get_highest_static_file_tx(static_file_segment)
|
||||
.get_highest_static_file_tx(segment)
|
||||
.map(|tx_num| tx_num + 1)
|
||||
.unwrap_or_default();
|
||||
writer.prune_receipts(to_delete, 0)?;
|
||||
}
|
||||
StaticFileSegment::TransactionSenders => {
|
||||
let to_delete = static_file_provider
|
||||
.get_highest_static_file_tx(static_file_segment)
|
||||
.get_highest_static_file_tx(segment)
|
||||
.map(|tx_num| tx_num + 1)
|
||||
.unwrap_or_default();
|
||||
writer.prune_transaction_senders(to_delete, 0)?;
|
||||
@@ -128,8 +133,15 @@ impl<C: ChainSpecParser> Command<C> {
|
||||
reset_stage_checkpoint(tx, StageId::SenderRecovery)?;
|
||||
}
|
||||
StageEnum::Execution => {
|
||||
tx.clear::<tables::PlainAccountState>()?;
|
||||
tx.clear::<tables::PlainStorageState>()?;
|
||||
if provider_rw.cached_storage_settings().use_hashed_state() {
|
||||
tx.clear::<tables::HashedAccounts>()?;
|
||||
tx.clear::<tables::HashedStorages>()?;
|
||||
reset_stage_checkpoint(tx, StageId::AccountHashing)?;
|
||||
reset_stage_checkpoint(tx, StageId::StorageHashing)?;
|
||||
} else {
|
||||
tx.clear::<tables::PlainAccountState>()?;
|
||||
tx.clear::<tables::PlainStorageState>()?;
|
||||
}
|
||||
tx.clear::<tables::AccountChangeSets>()?;
|
||||
tx.clear::<tables::StorageChangeSets>()?;
|
||||
tx.clear::<tables::Bytecodes>()?;
|
||||
@@ -171,29 +183,42 @@ impl<C: ChainSpecParser> Command<C> {
|
||||
None,
|
||||
)?;
|
||||
}
|
||||
StageEnum::AccountHistory | StageEnum::StorageHistory => {
|
||||
StageEnum::AccountHistory => {
|
||||
let settings = provider_rw.cached_storage_settings();
|
||||
let rocksdb = tool.provider_factory.rocksdb_provider();
|
||||
|
||||
if settings.account_history_in_rocksdb {
|
||||
if settings.storage_v2 {
|
||||
rocksdb.clear::<tables::AccountsHistory>()?;
|
||||
} else {
|
||||
tx.clear::<tables::AccountsHistory>()?;
|
||||
}
|
||||
|
||||
if settings.storages_history_in_rocksdb {
|
||||
reset_stage_checkpoint(tx, StageId::IndexAccountHistory)?;
|
||||
|
||||
insert_genesis_account_history(
|
||||
&provider_rw,
|
||||
self.env.chain.genesis().alloc.iter(),
|
||||
)?;
|
||||
}
|
||||
StageEnum::StorageHistory => {
|
||||
let settings = provider_rw.cached_storage_settings();
|
||||
let rocksdb = tool.provider_factory.rocksdb_provider();
|
||||
|
||||
if settings.storage_v2 {
|
||||
rocksdb.clear::<tables::StoragesHistory>()?;
|
||||
} else {
|
||||
tx.clear::<tables::StoragesHistory>()?;
|
||||
}
|
||||
|
||||
reset_stage_checkpoint(tx, StageId::IndexAccountHistory)?;
|
||||
reset_stage_checkpoint(tx, StageId::IndexStorageHistory)?;
|
||||
|
||||
insert_genesis_history(&provider_rw, self.env.chain.genesis().alloc.iter())?;
|
||||
insert_genesis_storage_history(
|
||||
&provider_rw,
|
||||
self.env.chain.genesis().alloc.iter(),
|
||||
)?;
|
||||
}
|
||||
StageEnum::TxLookup => {
|
||||
if provider_rw.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
|
||||
if provider_rw.cached_storage_settings().storage_v2 {
|
||||
tool.provider_factory
|
||||
.rocksdb_provider()
|
||||
.clear::<tables::TransactionHashNumbers>()?;
|
||||
|
||||
@@ -37,12 +37,14 @@ where
|
||||
unwind_and_copy(db_tool, from, tip_block_number, &output_db, evm_config.clone())?;
|
||||
|
||||
if should_run {
|
||||
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
|
||||
dry_run(
|
||||
ProviderFactory::<N>::new(
|
||||
output_db,
|
||||
db_tool.chain(),
|
||||
StaticFileProvider::read_write(output_datadir.static_files())?,
|
||||
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
|
||||
runtime,
|
||||
)?,
|
||||
to,
|
||||
from,
|
||||
|
||||
@@ -33,12 +33,14 @@ pub(crate) async fn dump_hashing_account_stage<N: ProviderNodeTypes<DB = Databas
|
||||
unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;
|
||||
|
||||
if should_run {
|
||||
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
|
||||
dry_run(
|
||||
ProviderFactory::<N>::new(
|
||||
output_db,
|
||||
db_tool.chain(),
|
||||
StaticFileProvider::read_write(output_datadir.static_files())?,
|
||||
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
|
||||
runtime,
|
||||
)?,
|
||||
to,
|
||||
from,
|
||||
|
||||
@@ -23,12 +23,14 @@ pub(crate) async fn dump_hashing_storage_stage<N: ProviderNodeTypes<DB = Databas
|
||||
unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;
|
||||
|
||||
if should_run {
|
||||
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
|
||||
dry_run(
|
||||
ProviderFactory::<N>::new(
|
||||
output_db,
|
||||
db_tool.chain(),
|
||||
StaticFileProvider::read_write(output_datadir.static_files())?,
|
||||
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
|
||||
runtime,
|
||||
)?,
|
||||
to,
|
||||
from,
|
||||
|
||||
@@ -57,12 +57,14 @@ where
|
||||
unwind_and_copy(db_tool, (from, to), tip_block_number, &output_db, evm_config, consensus)?;
|
||||
|
||||
if should_run {
|
||||
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
|
||||
dry_run(
|
||||
ProviderFactory::<N>::new(
|
||||
output_db,
|
||||
db_tool.chain(),
|
||||
StaticFileProvider::read_write(output_datadir.static_files())?,
|
||||
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
|
||||
runtime,
|
||||
)?,
|
||||
to,
|
||||
from,
|
||||
|
||||
@@ -248,9 +248,10 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
|
||||
(Box::new(stage), None)
|
||||
}
|
||||
StageEnum::Senders => (
|
||||
Box::new(SenderRecoveryStage::new(SenderRecoveryConfig {
|
||||
commit_threshold: batch_size,
|
||||
})),
|
||||
Box::new(SenderRecoveryStage::new(
|
||||
SenderRecoveryConfig { commit_threshold: batch_size },
|
||||
None,
|
||||
)),
|
||||
None,
|
||||
),
|
||||
StageEnum::Execution => (
|
||||
|
||||
@@ -10,9 +10,10 @@
|
||||
|
||||
//! Entrypoint for running commands.
|
||||
|
||||
use reth_tasks::{TaskExecutor, TaskManager};
|
||||
use reth_tasks::{PanickedTaskError, TaskExecutor};
|
||||
use std::{future::Future, pin::pin, sync::mpsc, time::Duration};
|
||||
use tracing::{debug, error, trace};
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
/// Executes CLI commands.
|
||||
///
|
||||
@@ -20,21 +21,24 @@ use tracing::{debug, error, trace};
|
||||
#[derive(Debug)]
|
||||
pub struct CliRunner {
|
||||
config: CliRunnerConfig,
|
||||
tokio_runtime: tokio::runtime::Runtime,
|
||||
runtime: reth_tasks::Runtime,
|
||||
}
|
||||
|
||||
impl CliRunner {
|
||||
/// Attempts to create a new [`CliRunner`] using the default tokio
|
||||
/// [`Runtime`](tokio::runtime::Runtime).
|
||||
/// Attempts to create a new [`CliRunner`] using the default
|
||||
/// [`Runtime`](reth_tasks::Runtime).
|
||||
///
|
||||
/// The default tokio runtime is multi-threaded, with both I/O and time drivers enabled.
|
||||
pub fn try_default_runtime() -> Result<Self, std::io::Error> {
|
||||
Ok(Self { config: CliRunnerConfig::default(), tokio_runtime: tokio_runtime()? })
|
||||
/// The default runtime is multi-threaded, with both I/O and time drivers enabled.
|
||||
pub fn try_default_runtime() -> Result<Self, reth_tasks::RuntimeBuildError> {
|
||||
Self::try_with_runtime_config(reth_tasks::RuntimeConfig::default())
|
||||
}
|
||||
|
||||
/// Create a new [`CliRunner`] from a provided tokio [`Runtime`](tokio::runtime::Runtime).
|
||||
pub const fn from_runtime(tokio_runtime: tokio::runtime::Runtime) -> Self {
|
||||
Self { config: CliRunnerConfig::new(), tokio_runtime }
|
||||
/// Creates a new [`CliRunner`] with the given [`RuntimeConfig`](reth_tasks::RuntimeConfig).
|
||||
pub fn try_with_runtime_config(
|
||||
config: reth_tasks::RuntimeConfig,
|
||||
) -> Result<Self, reth_tasks::RuntimeBuildError> {
|
||||
let runtime = reth_tasks::RuntimeBuilder::new(config).build()?;
|
||||
Ok(Self { config: CliRunnerConfig::default(), runtime })
|
||||
}
|
||||
|
||||
/// Sets the [`CliRunnerConfig`] for this runner.
|
||||
@@ -48,7 +52,7 @@ impl CliRunner {
|
||||
where
|
||||
F: Future<Output = T>,
|
||||
{
|
||||
self.tokio_runtime.block_on(fut)
|
||||
self.runtime.handle().block_on(fut)
|
||||
}
|
||||
|
||||
/// Executes the given _async_ command on the tokio runtime until the command future resolves or
|
||||
@@ -64,12 +68,11 @@ impl CliRunner {
|
||||
F: Future<Output = Result<(), E>>,
|
||||
E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
|
||||
{
|
||||
let AsyncCliRunner { context, mut task_manager, tokio_runtime } =
|
||||
AsyncCliRunner::new(self.tokio_runtime);
|
||||
let (context, task_manager_handle) = cli_context(&self.runtime);
|
||||
|
||||
// Executes the command until it finished or ctrl-c was fired
|
||||
let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
|
||||
&mut task_manager,
|
||||
let command_res = self.runtime.handle().block_on(run_to_completion_or_panic(
|
||||
task_manager_handle,
|
||||
run_until_ctrl_c(command(context)),
|
||||
));
|
||||
|
||||
@@ -77,13 +80,13 @@ impl CliRunner {
|
||||
error!(target: "reth::cli", "shutting down due to error");
|
||||
} else {
|
||||
debug!(target: "reth::cli", "shutting down gracefully");
|
||||
// after the command has finished or exit signal was received we shutdown the task
|
||||
// manager which fires the shutdown signal to all tasks spawned via the task
|
||||
// after the command has finished or exit signal was received we shutdown the
|
||||
// runtime which fires the shutdown signal to all tasks spawned via the task
|
||||
// executor and awaiting on tasks spawned with graceful shutdown
|
||||
task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
|
||||
self.runtime.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
|
||||
}
|
||||
|
||||
tokio_shutdown(tokio_runtime, true);
|
||||
runtime_shutdown(self.runtime, true);
|
||||
|
||||
command_res
|
||||
}
|
||||
@@ -99,17 +102,16 @@ impl CliRunner {
|
||||
F: Future<Output = Result<(), E>> + Send + 'static,
|
||||
E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
|
||||
{
|
||||
let AsyncCliRunner { context, mut task_manager, tokio_runtime } =
|
||||
AsyncCliRunner::new(self.tokio_runtime);
|
||||
let (context, task_manager_handle) = cli_context(&self.runtime);
|
||||
|
||||
// Spawn the command on the blocking thread pool
|
||||
let handle = tokio_runtime.handle().clone();
|
||||
let command_handle =
|
||||
tokio_runtime.handle().spawn_blocking(move || handle.block_on(command(context)));
|
||||
let handle = self.runtime.handle().clone();
|
||||
let handle2 = handle.clone();
|
||||
let command_handle = handle.spawn_blocking(move || handle2.block_on(command(context)));
|
||||
|
||||
// Wait for the command to complete or ctrl-c
|
||||
let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
|
||||
&mut task_manager,
|
||||
let command_res = self.runtime.handle().block_on(run_to_completion_or_panic(
|
||||
task_manager_handle,
|
||||
run_until_ctrl_c(
|
||||
async move { command_handle.await.expect("Failed to join blocking task") },
|
||||
),
|
||||
@@ -119,10 +121,10 @@ impl CliRunner {
|
||||
error!(target: "reth::cli", "shutting down due to error");
|
||||
} else {
|
||||
debug!(target: "reth::cli", "shutting down gracefully");
|
||||
task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
|
||||
self.runtime.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
|
||||
}
|
||||
|
||||
tokio_shutdown(tokio_runtime, true);
|
||||
runtime_shutdown(self.runtime, true);
|
||||
|
||||
command_res
|
||||
}
|
||||
@@ -133,48 +135,40 @@ impl CliRunner {
|
||||
F: Future<Output = Result<(), E>>,
|
||||
E: Send + Sync + From<std::io::Error> + 'static,
|
||||
{
|
||||
self.tokio_runtime.block_on(run_until_ctrl_c(fut))?;
|
||||
self.runtime.handle().block_on(run_until_ctrl_c(fut))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Executes a regular future as a spawned blocking task until completion or until external
|
||||
/// signal received.
|
||||
///
|
||||
/// See [`Runtime::spawn_blocking`](tokio::runtime::Runtime::spawn_blocking) .
|
||||
/// See [`Runtime::spawn_blocking`](tokio::runtime::Runtime::spawn_blocking).
|
||||
pub fn run_blocking_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
|
||||
where
|
||||
F: Future<Output = Result<(), E>> + Send + 'static,
|
||||
E: Send + Sync + From<std::io::Error> + 'static,
|
||||
{
|
||||
let tokio_runtime = self.tokio_runtime;
|
||||
let handle = tokio_runtime.handle().clone();
|
||||
let fut = tokio_runtime.handle().spawn_blocking(move || handle.block_on(fut));
|
||||
tokio_runtime
|
||||
let handle = self.runtime.handle().clone();
|
||||
let handle2 = handle.clone();
|
||||
let fut = handle.spawn_blocking(move || handle2.block_on(fut));
|
||||
self.runtime
|
||||
.handle()
|
||||
.block_on(run_until_ctrl_c(async move { fut.await.expect("Failed to join task") }))?;
|
||||
|
||||
tokio_shutdown(tokio_runtime, false);
|
||||
runtime_shutdown(self.runtime, false);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// [`CliRunner`] configuration when executing commands asynchronously
|
||||
struct AsyncCliRunner {
|
||||
context: CliContext,
|
||||
task_manager: TaskManager,
|
||||
tokio_runtime: tokio::runtime::Runtime,
|
||||
}
|
||||
|
||||
// === impl AsyncCliRunner ===
|
||||
|
||||
impl AsyncCliRunner {
|
||||
/// Given a tokio [`Runtime`](tokio::runtime::Runtime), creates additional context required to
|
||||
/// execute commands asynchronously.
|
||||
fn new(tokio_runtime: tokio::runtime::Runtime) -> Self {
|
||||
let task_manager = TaskManager::new(tokio_runtime.handle().clone());
|
||||
let task_executor = task_manager.executor();
|
||||
Self { context: CliContext { task_executor }, task_manager, tokio_runtime }
|
||||
}
|
||||
/// Extracts the task manager handle from the runtime and creates the [`CliContext`].
|
||||
fn cli_context(
|
||||
runtime: &reth_tasks::Runtime,
|
||||
) -> (CliContext, JoinHandle<Result<(), PanickedTaskError>>) {
|
||||
let handle =
|
||||
runtime.take_task_manager_handle().expect("Runtime must contain a TaskManager handle");
|
||||
let context = CliContext { task_executor: runtime.clone() };
|
||||
(context, handle)
|
||||
}
|
||||
|
||||
/// Additional context provided by the [`CliRunner`] when executing commands
|
||||
@@ -216,37 +210,25 @@ impl CliRunnerConfig {
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new default tokio multi-thread [Runtime](tokio::runtime::Runtime) with all features
|
||||
/// enabled
|
||||
pub fn tokio_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
// Keep the threads alive for at least the block time (12 seconds) plus buffer.
|
||||
// This prevents the costly process of spawning new threads on every
|
||||
// new block, and instead reuses the existing threads.
|
||||
.thread_keep_alive(Duration::from_secs(15))
|
||||
.thread_name("tokio-rt")
|
||||
.build()
|
||||
}
|
||||
|
||||
/// Runs the given future to completion or until a critical task panicked.
|
||||
///
|
||||
/// Returns the error if a task panicked, or the given future returned an error.
|
||||
async fn run_to_completion_or_panic<F, E>(tasks: &mut TaskManager, fut: F) -> Result<(), E>
|
||||
async fn run_to_completion_or_panic<F, E>(
|
||||
task_manager_handle: JoinHandle<Result<(), PanickedTaskError>>,
|
||||
fut: F,
|
||||
) -> Result<(), E>
|
||||
where
|
||||
F: Future<Output = Result<(), E>>,
|
||||
E: Send + Sync + From<reth_tasks::PanickedTaskError> + 'static,
|
||||
{
|
||||
{
|
||||
let fut = pin!(fut);
|
||||
tokio::select! {
|
||||
task_manager_result = tasks => {
|
||||
if let Err(panicked_error) = task_manager_result {
|
||||
return Err(panicked_error.into());
|
||||
}
|
||||
},
|
||||
res = fut => res?,
|
||||
}
|
||||
let fut = pin!(fut);
|
||||
tokio::select! {
|
||||
task_manager_result = task_manager_handle => {
|
||||
if let Ok(Err(panicked_error)) = task_manager_result {
|
||||
return Err(panicked_error.into());
|
||||
}
|
||||
},
|
||||
res = fut => res?,
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -271,10 +253,10 @@ where
|
||||
|
||||
tokio::select! {
|
||||
_ = ctrl_c => {
|
||||
trace!(target: "reth::cli", "Received ctrl-c");
|
||||
info!(target: "reth::cli", "Received ctrl-c");
|
||||
},
|
||||
_ = sigterm => {
|
||||
trace!(target: "reth::cli", "Received SIGTERM");
|
||||
info!(target: "reth::cli", "Received SIGTERM");
|
||||
},
|
||||
res = fut => res?,
|
||||
}
|
||||
@@ -287,7 +269,7 @@ where
|
||||
|
||||
tokio::select! {
|
||||
_ = ctrl_c => {
|
||||
trace!(target: "reth::cli", "Received ctrl-c");
|
||||
info!(target: "reth::cli", "Received ctrl-c");
|
||||
},
|
||||
res = fut => res?,
|
||||
}
|
||||
@@ -296,17 +278,17 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Shut down the given Tokio runtime, and wait for it if `wait` is set.
|
||||
/// Default timeout for waiting on the tokio runtime to shut down.
|
||||
const DEFAULT_RUNTIME_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
/// Shut down the given [`Runtime`](reth_tasks::Runtime), and wait for it if `wait` is set.
|
||||
///
|
||||
/// `drop(tokio_runtime)` would block the current thread until its pools
|
||||
/// (including blocking pool) are shutdown. Since we want to exit as soon as possible, drop
|
||||
/// it on a separate thread and wait for up to 5 seconds for this operation to
|
||||
/// complete.
|
||||
fn tokio_shutdown(rt: tokio::runtime::Runtime, wait: bool) {
|
||||
// Shutdown the runtime on a separate thread
|
||||
/// Dropping the runtime on the current thread could block due to tokio pool teardown.
|
||||
/// Instead, we drop it on a separate thread and optionally wait for completion.
|
||||
fn runtime_shutdown(rt: reth_tasks::Runtime, wait: bool) {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
std::thread::Builder::new()
|
||||
.name("tokio-shutdown".to_string())
|
||||
.name("rt-shutdown".to_string())
|
||||
.spawn(move || {
|
||||
drop(rt);
|
||||
let _ = tx.send(());
|
||||
@@ -314,8 +296,8 @@ fn tokio_shutdown(rt: tokio::runtime::Runtime, wait: bool) {
|
||||
.unwrap();
|
||||
|
||||
if wait {
|
||||
let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
|
||||
debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
|
||||
let _ = rx.recv_timeout(DEFAULT_RUNTIME_SHUTDOWN_TIMEOUT).inspect_err(|err| {
|
||||
tracing::warn!(target: "reth::cli", %err, "runtime shutdown timed out");
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,7 +11,6 @@ use reth_node_builder::{
|
||||
PayloadTypes,
|
||||
};
|
||||
use reth_provider::providers::{BlockchainProvider, NodeTypesForProvider};
|
||||
use reth_tasks::TaskManager;
|
||||
use std::sync::Arc;
|
||||
use wallet::Wallet;
|
||||
|
||||
@@ -50,7 +49,7 @@ pub async fn setup<N>(
|
||||
chain_spec: Arc<N::ChainSpec>,
|
||||
is_dev: bool,
|
||||
attributes_generator: impl Fn(u64) -> <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes + Send + Sync + Copy + 'static,
|
||||
) -> eyre::Result<(Vec<NodeHelperType<N>>, TaskManager, Wallet)>
|
||||
) -> eyre::Result<(Vec<NodeHelperType<N>>, Wallet)>
|
||||
where
|
||||
N: NodeBuilderHelper,
|
||||
{
|
||||
@@ -69,7 +68,6 @@ pub async fn setup_engine<N>(
|
||||
attributes_generator: impl Fn(u64) -> <<N as NodeTypes>::Payload as PayloadTypes>::PayloadBuilderAttributes + Send + Sync + Copy + 'static,
|
||||
) -> eyre::Result<(
|
||||
Vec<NodeHelperType<N, BlockchainProvider<NodeTypesWithDBAdapter<N, TmpDB>>>>,
|
||||
TaskManager,
|
||||
Wallet,
|
||||
)>
|
||||
where
|
||||
@@ -96,7 +94,6 @@ pub async fn setup_engine_with_connection<N>(
|
||||
connect_nodes: bool,
|
||||
) -> eyre::Result<(
|
||||
Vec<NodeHelperType<N, BlockchainProvider<NodeTypesWithDBAdapter<N, TmpDB>>>>,
|
||||
TaskManager,
|
||||
Wallet,
|
||||
)>
|
||||
where
|
||||
|
||||
@@ -14,7 +14,7 @@ use reth_node_core::args::{DiscoveryArgs, NetworkArgs, RpcServerArgs};
|
||||
use reth_primitives_traits::AlloyBlockHeader;
|
||||
use reth_provider::providers::BlockchainProvider;
|
||||
use reth_rpc_server_types::RpcModuleSelection;
|
||||
use reth_tasks::TaskManager;
|
||||
use reth_tasks::Runtime;
|
||||
use std::sync::Arc;
|
||||
use tracing::{span, Instrument, Level};
|
||||
|
||||
@@ -110,11 +110,9 @@ where
|
||||
self,
|
||||
) -> eyre::Result<(
|
||||
Vec<NodeHelperType<N, BlockchainProvider<NodeTypesWithDBAdapter<N, TmpDB>>>>,
|
||||
TaskManager,
|
||||
Wallet,
|
||||
)> {
|
||||
let tasks = TaskManager::current();
|
||||
let exec = tasks.executor();
|
||||
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
|
||||
|
||||
let network_config = NetworkArgs {
|
||||
discovery: DiscoveryArgs { disable_discovery: true, ..DiscoveryArgs::default() },
|
||||
@@ -153,7 +151,7 @@ where
|
||||
let span = span!(Level::INFO, "node", idx);
|
||||
let node = N::default();
|
||||
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config)
|
||||
.testing_node(exec.clone())
|
||||
.testing_node(runtime.clone())
|
||||
.with_types_and_provider::<N, BlockchainProvider<_>>()
|
||||
.with_components(node.components_builder())
|
||||
.with_add_ons(node.add_ons())
|
||||
@@ -197,7 +195,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
Ok((nodes, tasks, Wallet::default().with_chain_id(self.chain_spec.chain().into())))
|
||||
Ok((nodes, Wallet::default().with_chain_id(self.chain_spec.chain().into())))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ use reth_provider::{
|
||||
};
|
||||
use reth_rpc_server_types::RpcModuleSelection;
|
||||
use reth_stages_types::StageId;
|
||||
use reth_tasks::TaskManager;
|
||||
use reth_tasks::Runtime;
|
||||
use std::{path::Path, sync::Arc};
|
||||
use tempfile::TempDir;
|
||||
use tracing::{debug, info, span, Level};
|
||||
@@ -24,8 +24,6 @@ use tracing::{debug, info, span, Level};
|
||||
pub struct ChainImportResult {
|
||||
/// The nodes that were created
|
||||
pub nodes: Vec<NodeHelperType<EthereumNode>>,
|
||||
/// The task manager
|
||||
pub task_manager: TaskManager,
|
||||
/// The wallet for testing
|
||||
pub wallet: Wallet,
|
||||
/// Temporary directories that must be kept alive for the duration of the test
|
||||
@@ -68,8 +66,7 @@ pub async fn setup_engine_with_chain_import(
|
||||
+ Copy
|
||||
+ 'static,
|
||||
) -> eyre::Result<ChainImportResult> {
|
||||
let tasks = TaskManager::current();
|
||||
let exec = tasks.executor();
|
||||
let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
|
||||
|
||||
let network_config = NetworkArgs {
|
||||
discovery: DiscoveryArgs { disable_discovery: true, ..DiscoveryArgs::default() },
|
||||
@@ -129,6 +126,7 @@ pub async fn setup_engine_with_chain_import(
|
||||
.with_default_tables()
|
||||
.build()
|
||||
.unwrap(),
|
||||
reth_tasks::Runtime::test(),
|
||||
)?;
|
||||
|
||||
// Initialize genesis if needed
|
||||
@@ -221,7 +219,7 @@ pub async fn setup_engine_with_chain_import(
|
||||
let node = EthereumNode::default();
|
||||
|
||||
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config.clone())
|
||||
.testing_node_with_datadir(exec.clone(), datadir.clone())
|
||||
.testing_node_with_datadir(runtime.clone(), datadir.clone())
|
||||
.with_types_and_provider::<EthereumNode, BlockchainProvider<_>>()
|
||||
.with_components(node.components_builder())
|
||||
.with_add_ons(node.add_ons())
|
||||
@@ -243,7 +241,6 @@ pub async fn setup_engine_with_chain_import(
|
||||
|
||||
Ok(ChainImportResult {
|
||||
nodes,
|
||||
task_manager: tasks,
|
||||
wallet: crate::Wallet::default().with_chain_id(chain_spec.chain.id()),
|
||||
_temp_dirs: temp_dirs,
|
||||
})
|
||||
@@ -333,6 +330,7 @@ mod tests {
|
||||
.with_default_tables()
|
||||
.build()
|
||||
.unwrap(),
|
||||
reth_tasks::Runtime::test(),
|
||||
)
|
||||
.expect("failed to create provider factory");
|
||||
|
||||
@@ -397,6 +395,7 @@ mod tests {
|
||||
.with_default_tables()
|
||||
.build()
|
||||
.unwrap(),
|
||||
reth_tasks::Runtime::test(),
|
||||
)
|
||||
.expect("failed to create provider factory");
|
||||
|
||||
@@ -497,6 +496,7 @@ mod tests {
|
||||
.with_default_tables()
|
||||
.build()
|
||||
.unwrap(),
|
||||
reth_tasks::Runtime::test(),
|
||||
)
|
||||
.expect("failed to create provider factory");
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
//! Test setup utilities for configuring the initial state.
|
||||
|
||||
use crate::{setup_engine_with_connection, testsuite::Environment, NodeBuilderHelper};
|
||||
use crate::{testsuite::Environment, E2ETestSetupBuilder, NodeBuilderHelper};
|
||||
use alloy_eips::BlockNumberOrTag;
|
||||
use alloy_primitives::B256;
|
||||
use alloy_rpc_types_engine::{ForkchoiceState, PayloadAttributes};
|
||||
@@ -38,6 +38,8 @@ pub struct Setup<I> {
|
||||
shutdown_tx: Option<mpsc::Sender<()>>,
|
||||
/// Is this setup in dev mode
|
||||
pub is_dev: bool,
|
||||
/// Whether to use v2 storage mode (hashed keys, static file changesets, rocksdb history)
|
||||
pub storage_v2: bool,
|
||||
/// Tracks instance generic.
|
||||
_phantom: PhantomData<I>,
|
||||
/// Holds the import result to keep nodes alive when using imported chain
|
||||
@@ -58,6 +60,7 @@ impl<I> Default for Setup<I> {
|
||||
tree_config: TreeConfig::default(),
|
||||
shutdown_tx: None,
|
||||
is_dev: true,
|
||||
storage_v2: false,
|
||||
_phantom: Default::default(),
|
||||
import_result_holder: None,
|
||||
import_rlp_path: None,
|
||||
@@ -126,6 +129,12 @@ where
|
||||
self
|
||||
}
|
||||
|
||||
/// Enable v2 storage mode (hashed keys, static file changesets, rocksdb history)
|
||||
pub const fn with_storage_v2(mut self) -> Self {
|
||||
self.storage_v2 = true;
|
||||
self
|
||||
}
|
||||
|
||||
/// Apply setup using pre-imported chain data from RLP file
|
||||
pub async fn apply_with_import<N>(
|
||||
&mut self,
|
||||
@@ -194,23 +203,32 @@ where
|
||||
self.shutdown_tx = Some(shutdown_tx);
|
||||
|
||||
let is_dev = self.is_dev;
|
||||
let storage_v2 = self.storage_v2;
|
||||
let node_count = self.network.node_count;
|
||||
let tree_config = self.tree_config.clone();
|
||||
|
||||
let attributes_generator = Self::create_static_attributes_generator::<N>();
|
||||
|
||||
let result = setup_engine_with_connection::<N>(
|
||||
let mut builder = E2ETestSetupBuilder::<N, _>::new(
|
||||
node_count,
|
||||
Arc::<N::ChainSpec>::new((*chain_spec).clone().into()),
|
||||
is_dev,
|
||||
self.tree_config.clone(),
|
||||
attributes_generator,
|
||||
self.network.connect_nodes,
|
||||
)
|
||||
.await;
|
||||
.with_tree_config_modifier(move |base| {
|
||||
tree_config.clone().with_cross_block_cache_size(base.cross_block_cache_size())
|
||||
})
|
||||
.with_node_config_modifier(move |config| config.set_dev(is_dev))
|
||||
.with_connect_nodes(self.network.connect_nodes);
|
||||
|
||||
if storage_v2 {
|
||||
builder = builder.with_storage_v2();
|
||||
}
|
||||
|
||||
let result = builder.build().await;
|
||||
|
||||
let mut node_clients = Vec::new();
|
||||
match result {
|
||||
Ok((nodes, executor, _wallet)) => {
|
||||
Ok((nodes, _wallet)) => {
|
||||
// create HTTP clients for each node's RPC and Engine API endpoints
|
||||
for node in &nodes {
|
||||
node_clients.push(node.to_node_client()?);
|
||||
@@ -218,12 +236,11 @@ where
|
||||
|
||||
// spawn a separate task just to handle the shutdown
|
||||
tokio::spawn(async move {
|
||||
// keep nodes and executor in scope to ensure they're not dropped
|
||||
// keep nodes in scope to ensure they're not dropped
|
||||
let _nodes = nodes;
|
||||
let _executor = executor;
|
||||
// Wait for shutdown signal
|
||||
let _ = shutdown_rx.recv().await;
|
||||
// nodes and executor will be dropped here when the test completes
|
||||
// nodes will be dropped here when the test completes
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
|
||||
@@ -370,15 +370,14 @@ async fn test_setup_builder_with_custom_tree_config() -> Result<()> {
|
||||
.build(),
|
||||
);
|
||||
|
||||
let (nodes, _tasks, _wallet) =
|
||||
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, |_| {
|
||||
EthPayloadBuilderAttributes::default()
|
||||
})
|
||||
.with_tree_config_modifier(|config| {
|
||||
config.with_persistence_threshold(0).with_memory_block_buffer_target(5)
|
||||
})
|
||||
.build()
|
||||
.await?;
|
||||
let (nodes, _wallet) = E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, |_| {
|
||||
EthPayloadBuilderAttributes::default()
|
||||
})
|
||||
.with_tree_config_modifier(|config| {
|
||||
config.with_persistence_threshold(0).with_memory_block_buffer_target(5)
|
||||
})
|
||||
.build()
|
||||
.await?;
|
||||
|
||||
assert_eq!(nodes.len(), 1);
|
||||
|
||||
|
||||
@@ -10,7 +10,6 @@ use jsonrpsee::core::client::ClientT;
|
||||
use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
|
||||
use reth_db::tables;
|
||||
use reth_e2e_test_utils::{transaction::TransactionTestContext, wallet, E2ETestSetupBuilder};
|
||||
use reth_node_core::args::RocksDbArgs;
|
||||
use reth_node_ethereum::EthereumNode;
|
||||
use reth_payload_builder::EthPayloadBuilderAttributes;
|
||||
use reth_provider::RocksDBProviderFactory;
|
||||
@@ -96,22 +95,6 @@ fn test_attributes_generator(timestamp: u64) -> EthPayloadBuilderAttributes {
|
||||
EthPayloadBuilderAttributes::new(B256::ZERO, attributes)
|
||||
}
|
||||
|
||||
/// Verifies that `RocksDB` CLI defaults are `None` (deferred to storage mode).
|
||||
#[test]
|
||||
fn test_rocksdb_defaults_are_none() {
|
||||
let args = RocksDbArgs::default();
|
||||
|
||||
assert!(args.tx_hash.is_none(), "tx_hash default should be None (deferred to --storage.v2)");
|
||||
assert!(
|
||||
args.storages_history.is_none(),
|
||||
"storages_history default should be None (deferred to --storage.v2)"
|
||||
);
|
||||
assert!(
|
||||
args.account_history.is_none(),
|
||||
"account_history default should be None (deferred to --storage.v2)"
|
||||
);
|
||||
}
|
||||
|
||||
/// Smoke test: node boots with `RocksDB` routing enabled.
|
||||
#[tokio::test]
|
||||
async fn test_rocksdb_node_startup() -> Result<()> {
|
||||
@@ -119,7 +102,7 @@ async fn test_rocksdb_node_startup() -> Result<()> {
|
||||
|
||||
let chain_spec = test_chain_spec();
|
||||
|
||||
let (nodes, _tasks, _wallet) =
|
||||
let (nodes, _wallet) =
|
||||
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, test_attributes_generator)
|
||||
.with_storage_v2()
|
||||
.build()
|
||||
@@ -147,7 +130,7 @@ async fn test_rocksdb_block_mining() -> Result<()> {
|
||||
let chain_spec = test_chain_spec();
|
||||
let chain_id = chain_spec.chain().id();
|
||||
|
||||
let (mut nodes, _tasks, _wallet) =
|
||||
let (mut nodes, _wallet) =
|
||||
E2ETestSetupBuilder::<EthereumNode, _>::new(1, chain_spec, test_attributes_generator)
|
||||
.with_storage_v2()
|
||||
.build()
|
||||
@@ -201,7 +184,7 @@ async fn test_rocksdb_transaction_queries() -> Result<()> {
|
||||
let chain_spec = test_chain_spec();
|
||||
let chain_id = chain_spec.chain().id();
|
||||
|
||||
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
1,
|
||||
chain_spec.clone(),
|
||||
test_attributes_generator,
|
||||
@@ -268,7 +251,7 @@ async fn test_rocksdb_multi_tx_same_block() -> Result<()> {
|
||||
let chain_spec = test_chain_spec();
|
||||
let chain_id = chain_spec.chain().id();
|
||||
|
||||
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
1,
|
||||
chain_spec.clone(),
|
||||
test_attributes_generator,
|
||||
@@ -336,7 +319,7 @@ async fn test_rocksdb_txs_across_blocks() -> Result<()> {
|
||||
let chain_spec = test_chain_spec();
|
||||
let chain_id = chain_spec.chain().id();
|
||||
|
||||
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
1,
|
||||
chain_spec.clone(),
|
||||
test_attributes_generator,
|
||||
@@ -421,7 +404,7 @@ async fn test_rocksdb_pending_tx_not_in_storage() -> Result<()> {
|
||||
let chain_spec = test_chain_spec();
|
||||
let chain_id = chain_spec.chain().id();
|
||||
|
||||
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
1,
|
||||
chain_spec.clone(),
|
||||
test_attributes_generator,
|
||||
@@ -477,7 +460,7 @@ async fn test_rocksdb_pending_tx_not_in_storage() -> Result<()> {
|
||||
///
|
||||
/// This test exercises `unwind_trie_state_from` which previously failed with
|
||||
/// `UnsortedInput` errors because it read changesets directly from MDBX tables
|
||||
/// instead of using storage-aware methods that check `storage_changesets_in_static_files`.
|
||||
/// instead of using storage-aware methods that check `is_v2()`.
|
||||
#[tokio::test]
|
||||
async fn test_rocksdb_reorg_unwind() -> Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
@@ -485,7 +468,7 @@ async fn test_rocksdb_reorg_unwind() -> Result<()> {
|
||||
let chain_spec = test_chain_spec();
|
||||
let chain_id = chain_spec.chain().id();
|
||||
|
||||
let (mut nodes, _tasks, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
let (mut nodes, _) = E2ETestSetupBuilder::<EthereumNode, _>::new(
|
||||
1,
|
||||
chain_spec.clone(),
|
||||
test_attributes_generator,
|
||||
|
||||
@@ -36,8 +36,6 @@ futures = { workspace = true, optional = true }
|
||||
auto_impl.workspace = true
|
||||
serde.workspace = true
|
||||
thiserror.workspace = true
|
||||
rand = { workspace = true, optional = true }
|
||||
tracing = { workspace = true, optional = true }
|
||||
|
||||
[features]
|
||||
default = ["std"]
|
||||
@@ -56,4 +54,3 @@ std = [
|
||||
"thiserror/std",
|
||||
"reth-evm/std",
|
||||
]
|
||||
debug-jitter = ["dep:rand", "dep:tracing"]
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
//! Engine tree configuration.
|
||||
|
||||
use alloy_eips::merge::EPOCH_SLOTS;
|
||||
use core::time::Duration;
|
||||
|
||||
/// Triggers persistence when the number of canonical blocks in memory exceeds this threshold.
|
||||
pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2;
|
||||
@@ -64,6 +65,9 @@ pub const DEFAULT_SPARSE_TRIE_PRUNE_DEPTH: usize = 4;
|
||||
/// Storage tries beyond this limit are cleared (but allocations preserved).
|
||||
pub const DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES: usize = 100;
|
||||
|
||||
/// Default timeout for the state root task before spawning a sequential fallback.
|
||||
pub const DEFAULT_STATE_ROOT_TASK_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
|
||||
const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = EPOCH_SLOTS as u32 * 2;
|
||||
const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256;
|
||||
const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4;
|
||||
@@ -169,12 +173,19 @@ pub struct TreeConfig {
|
||||
disable_proof_v2: bool,
|
||||
/// Whether to disable cache metrics recording (can be expensive with large cached state).
|
||||
disable_cache_metrics: bool,
|
||||
/// Whether to enable sparse trie as cache.
|
||||
enable_sparse_trie_as_cache: bool,
|
||||
/// Whether to disable sparse trie cache.
|
||||
disable_trie_cache: bool,
|
||||
/// Depth for sparse trie pruning after state root computation.
|
||||
sparse_trie_prune_depth: usize,
|
||||
/// Maximum number of storage tries to retain after pruning.
|
||||
sparse_trie_max_storage_tries: usize,
|
||||
/// Whether to fully disable sparse trie cache pruning between blocks.
|
||||
disable_sparse_trie_cache_pruning: bool,
|
||||
/// Timeout for the state root task before spawning a sequential fallback computation.
|
||||
/// If `Some`, after waiting this duration for the state root task, a sequential state root
|
||||
/// computation is spawned in parallel and whichever finishes first is used.
|
||||
/// If `None`, the timeout fallback is disabled.
|
||||
state_root_task_timeout: Option<Duration>,
|
||||
}
|
||||
|
||||
impl Default for TreeConfig {
|
||||
@@ -204,9 +215,11 @@ impl Default for TreeConfig {
|
||||
account_worker_count: default_account_worker_count(),
|
||||
disable_proof_v2: false,
|
||||
disable_cache_metrics: false,
|
||||
enable_sparse_trie_as_cache: false,
|
||||
disable_trie_cache: false,
|
||||
sparse_trie_prune_depth: DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
|
||||
sparse_trie_max_storage_tries: DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
|
||||
disable_sparse_trie_cache_pruning: false,
|
||||
state_root_task_timeout: Some(DEFAULT_STATE_ROOT_TASK_TIMEOUT),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -241,6 +254,7 @@ impl TreeConfig {
|
||||
disable_cache_metrics: bool,
|
||||
sparse_trie_prune_depth: usize,
|
||||
sparse_trie_max_storage_tries: usize,
|
||||
state_root_task_timeout: Option<Duration>,
|
||||
) -> Self {
|
||||
Self {
|
||||
persistence_threshold,
|
||||
@@ -267,9 +281,11 @@ impl TreeConfig {
|
||||
account_worker_count,
|
||||
disable_proof_v2,
|
||||
disable_cache_metrics,
|
||||
enable_sparse_trie_as_cache: false,
|
||||
disable_trie_cache: false,
|
||||
sparse_trie_prune_depth,
|
||||
sparse_trie_max_storage_tries,
|
||||
disable_sparse_trie_cache_pruning: false,
|
||||
state_root_task_timeout,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -586,14 +602,14 @@ impl TreeConfig {
|
||||
self
|
||||
}
|
||||
|
||||
/// Returns whether sparse trie as cache is enabled.
|
||||
pub const fn enable_sparse_trie_as_cache(&self) -> bool {
|
||||
self.enable_sparse_trie_as_cache
|
||||
/// Returns whether sparse trie cache is disabled.
|
||||
pub const fn disable_trie_cache(&self) -> bool {
|
||||
self.disable_trie_cache
|
||||
}
|
||||
|
||||
/// Setter for whether to enable sparse trie as cache.
|
||||
pub const fn with_enable_sparse_trie_as_cache(mut self, value: bool) -> Self {
|
||||
self.enable_sparse_trie_as_cache = value;
|
||||
/// Setter for whether to disable sparse trie cache.
|
||||
pub const fn with_disable_trie_cache(mut self, value: bool) -> Self {
|
||||
self.disable_trie_cache = value;
|
||||
self
|
||||
}
|
||||
|
||||
@@ -618,4 +634,26 @@ impl TreeConfig {
|
||||
self.sparse_trie_max_storage_tries = max_tries;
|
||||
self
|
||||
}
|
||||
|
||||
/// Returns whether sparse trie cache pruning is disabled.
|
||||
pub const fn disable_sparse_trie_cache_pruning(&self) -> bool {
|
||||
self.disable_sparse_trie_cache_pruning
|
||||
}
|
||||
|
||||
/// Setter for whether to disable sparse trie cache pruning.
|
||||
pub const fn with_disable_sparse_trie_cache_pruning(mut self, value: bool) -> Self {
|
||||
self.disable_sparse_trie_cache_pruning = value;
|
||||
self
|
||||
}
|
||||
|
||||
/// Returns the state root task timeout.
|
||||
pub const fn state_root_task_timeout(&self) -> Option<Duration> {
|
||||
self.state_root_task_timeout
|
||||
}
|
||||
|
||||
/// Setter for state root task timeout.
|
||||
pub const fn with_state_root_task_timeout(mut self, timeout: Option<Duration>) -> Self {
|
||||
self.state_root_task_timeout = timeout;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,54 +0,0 @@
|
||||
//! Debug jitter utilities for testing timing-related bugs.
|
||||
//!
|
||||
//! When the `debug-jitter` feature is enabled, various components can add
|
||||
//! random delays to help trigger out-of-order timing bugs that may only
|
||||
//! manifest in real-world conditions.
|
||||
//!
|
||||
//! Control via environment variable:
|
||||
//! - `RETH_DEBUG_JITTER_MS`: Maximum jitter in milliseconds (0-N random delay)
|
||||
//!
|
||||
//! Example: `RETH_DEBUG_JITTER_MS=5` adds 0-5ms random delays.
|
||||
|
||||
use std::{sync::OnceLock, thread, time::Duration};
|
||||
|
||||
use rand::Rng;
|
||||
use tracing::trace;
|
||||
|
||||
/// Cached jitter configuration from environment.
|
||||
static JITTER_CONFIG: OnceLock<Option<u64>> = OnceLock::new();
|
||||
|
||||
/// Reads the jitter configuration from environment variables.
|
||||
///
|
||||
/// Returns `Some(max_ms)` if jitter is enabled, `None` otherwise.
|
||||
fn get_jitter_config() -> Option<u64> {
|
||||
*JITTER_CONFIG.get_or_init(|| {
|
||||
std::env::var("RETH_DEBUG_JITTER_MS")
|
||||
.ok()
|
||||
.and_then(|v| v.parse::<u64>().ok())
|
||||
.filter(|&ms| ms > 0)
|
||||
})
|
||||
}
|
||||
|
||||
/// Applies a random jitter delay if configured.
|
||||
///
|
||||
/// When `RETH_DEBUG_JITTER_MS` is set to a positive value N,
|
||||
/// this function sleeps for a random duration between 0 and N milliseconds.
|
||||
///
|
||||
/// This is useful for testing timing-sensitive code paths that may have
|
||||
/// race conditions or ordering bugs that only manifest with variable latencies.
|
||||
///
|
||||
/// The `context` parameter is used for logging to identify where jitter was applied.
|
||||
pub fn maybe_apply_jitter(context: &str) {
|
||||
if let Some(max_ms) = get_jitter_config() {
|
||||
let delay_ms = rand::rng().random_range(0..=max_ms);
|
||||
if delay_ms > 0 {
|
||||
trace!(
|
||||
target: "reth::jitter",
|
||||
context,
|
||||
delay_ms,
|
||||
"Applying debug jitter delay"
|
||||
);
|
||||
thread::sleep(Duration::from_millis(delay_ms));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -23,7 +23,7 @@ use serde::{de::DeserializeOwned, Serialize};
|
||||
|
||||
// Re-export [`ExecutionPayload`] moved to `reth_payload_primitives`
|
||||
#[cfg(feature = "std")]
|
||||
pub use reth_evm::{ConfigureEngineEvm, ExecutableTxIterator, ExecutableTxTuple};
|
||||
pub use reth_evm::{ConfigureEngineEvm, ConvertTx, ExecutableTxIterator, ExecutableTxTuple};
|
||||
pub use reth_payload_primitives::ExecutionPayload;
|
||||
|
||||
mod error;
|
||||
@@ -46,10 +46,6 @@ pub use invalid_block_hook::{InvalidBlockHook, InvalidBlockHooks, NoopInvalidBlo
|
||||
pub mod config;
|
||||
pub use config::*;
|
||||
|
||||
/// Debug jitter utilities for testing timing-related bugs.
|
||||
#[cfg(feature = "debug-jitter")]
|
||||
pub mod jitter;
|
||||
|
||||
/// This type defines the versioned types of the engine API based on the [ethereum engine API](https://github.com/ethereum/execution-apis/tree/main/src/engine).
|
||||
///
|
||||
/// This includes the execution payload types and payload attributes that are used to trigger a
|
||||
|
||||
@@ -20,7 +20,7 @@ use reth_node_types::{BlockTy, NodeTypes};
|
||||
use reth_payload_builder::PayloadBuilderHandle;
|
||||
use reth_provider::{
|
||||
providers::{BlockchainProvider, ProviderNodeTypes},
|
||||
ProviderFactory,
|
||||
ProviderFactory, StorageSettingsCache,
|
||||
};
|
||||
use reth_prune::PrunerWithFactory;
|
||||
use reth_stages_api::{MetricEventsSender, Pipeline};
|
||||
@@ -94,6 +94,7 @@ where
|
||||
if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum };
|
||||
|
||||
let downloader = BasicBlockDownloader::new(client, consensus.clone());
|
||||
let use_hashed_state = provider.cached_storage_settings().use_hashed_state();
|
||||
|
||||
let persistence_handle =
|
||||
PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
|
||||
@@ -111,6 +112,7 @@ where
|
||||
engine_kind,
|
||||
evm_config,
|
||||
changeset_cache,
|
||||
use_hashed_state,
|
||||
);
|
||||
|
||||
let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
|
||||
@@ -201,6 +203,7 @@ mod tests {
|
||||
TreeConfig::default(),
|
||||
Box::new(NoopInvalidBlockHook::default()),
|
||||
changeset_cache.clone(),
|
||||
reth_tasks::Runtime::test(),
|
||||
);
|
||||
|
||||
let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
|
||||
|
||||
@@ -27,12 +27,11 @@ reth-primitives-traits = { workspace = true, features = ["rayon", "dashmap"] }
|
||||
reth-ethereum-primitives.workspace = true
|
||||
reth-provider.workspace = true
|
||||
reth-prune.workspace = true
|
||||
reth-revm.workspace = true
|
||||
reth-revm = { workspace = true, features = ["optional-balance-check"] }
|
||||
reth-stages-api.workspace = true
|
||||
reth-tasks.workspace = true
|
||||
reth-trie-parallel.workspace = true
|
||||
reth-trie-sparse = { workspace = true, features = ["std", "metrics"] }
|
||||
reth-trie-sparse-parallel = { workspace = true, features = ["std"] }
|
||||
reth-trie.workspace = true
|
||||
reth-trie-common.workspace = true
|
||||
reth-trie-db.workspace = true
|
||||
@@ -116,7 +115,6 @@ name = "state_root_task"
|
||||
harness = false
|
||||
|
||||
[features]
|
||||
debug-jitter = ["reth-engine-primitives/debug-jitter", "reth-trie-parallel/debug-jitter"]
|
||||
test-utils = [
|
||||
"reth-chain-state/test-utils",
|
||||
"reth-chainspec/test-utils",
|
||||
@@ -143,7 +141,15 @@ test-utils = [
|
||||
"reth-ethereum-primitives/test-utils",
|
||||
"reth-node-ethereum/test-utils",
|
||||
"reth-evm-ethereum/test-utils",
|
||||
"reth-tasks/test-utils",
|
||||
]
|
||||
rocksdb = [
|
||||
"reth-provider/rocksdb",
|
||||
"reth-prune/rocksdb",
|
||||
"reth-stages?/rocksdb",
|
||||
"reth-e2e-test-utils/rocksdb",
|
||||
]
|
||||
edge = ["rocksdb"]
|
||||
|
||||
[[test]]
|
||||
name = "e2e_testsuite"
|
||||
|
||||
@@ -12,8 +12,7 @@ use rand::Rng;
|
||||
use reth_chainspec::ChainSpec;
|
||||
use reth_db_common::init::init_genesis;
|
||||
use reth_engine_tree::tree::{
|
||||
executor::WorkloadExecutor, precompile_cache::PrecompileCacheMap, PayloadProcessor,
|
||||
StateProviderBuilder, TreeConfig,
|
||||
precompile_cache::PrecompileCacheMap, PayloadProcessor, StateProviderBuilder, TreeConfig,
|
||||
};
|
||||
use reth_ethereum_primitives::TransactionSigned;
|
||||
use reth_evm::OnStateHook;
|
||||
@@ -219,7 +218,7 @@ fn bench_state_root(c: &mut Criterion) {
|
||||
setup_provider(&factory, &state_updates).expect("failed to setup provider");
|
||||
|
||||
let payload_processor = PayloadProcessor::new(
|
||||
WorkloadExecutor::default(),
|
||||
reth_tasks::Runtime::test(),
|
||||
EthEvmConfig::new(factory.chain_spec()),
|
||||
&TreeConfig::default(),
|
||||
PrecompileCacheMap::default(),
|
||||
|
||||
@@ -138,7 +138,7 @@ impl<N: ProviderNodeTypes> PipelineSync<N> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let pipeline = pipeline.take().expect("exists");
|
||||
self.pipeline_task_spawner.spawn_critical_blocking(
|
||||
self.pipeline_task_spawner.spawn_critical_blocking_task(
|
||||
"pipeline task",
|
||||
Box::pin(async move {
|
||||
let result = pipeline.run_as_fut(Some(target)).await;
|
||||
|
||||
@@ -9,7 +9,7 @@ use reth_provider::{
|
||||
providers::ProviderNodeTypes, BlockExecutionWriter, BlockHashReader, ChainStateBlockWriter,
|
||||
DBProvider, DatabaseProviderFactory, ProviderFactory, SaveBlocksMode,
|
||||
};
|
||||
use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
|
||||
use reth_prune::{PrunerError, PrunerWithFactory};
|
||||
use reth_stages_api::{MetricEvent, MetricEventsSender};
|
||||
use reth_tasks::spawn_os_thread;
|
||||
use std::{
|
||||
@@ -74,18 +74,6 @@ where
|
||||
pending_safe_block: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Prunes block data before the given block number according to the configured prune
|
||||
/// configuration.
|
||||
#[instrument(level = "debug", target = "engine::persistence", skip_all, fields(block_num))]
|
||||
fn prune_before(&mut self, block_num: u64) -> Result<PrunerOutput, PrunerError> {
|
||||
debug!(target: "engine::persistence", ?block_num, "Running pruner");
|
||||
let start_time = Instant::now();
|
||||
// TODO: doing this properly depends on pruner segment changes
|
||||
let result = self.pruner.run(block_num);
|
||||
self.metrics.prune_before_duration_seconds.record(start_time.elapsed());
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
impl<N> PersistenceService<N>
|
||||
@@ -118,11 +106,6 @@ where
|
||||
let _ = self
|
||||
.sync_metrics_tx
|
||||
.send(MetricEvent::SyncHeight { height: block_number });
|
||||
|
||||
if self.pruner.is_pruning_needed(block_number) {
|
||||
// We log `PrunerOutput` inside the `Pruner`
|
||||
let _ = self.prune_before(block_number)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
PersistenceAction::SaveFinalizedBlock(finalized_block) => {
|
||||
@@ -163,7 +146,6 @@ where
|
||||
let last_block = blocks.last().map(|b| b.recovered_block.num_hash());
|
||||
let block_count = blocks.len();
|
||||
|
||||
// Take any pending finalized/safe block updates to commit together
|
||||
let pending_finalized = self.pending_finalized_block.take();
|
||||
let pending_safe = self.pending_safe_block.take();
|
||||
|
||||
@@ -171,11 +153,10 @@ where
|
||||
|
||||
let start_time = Instant::now();
|
||||
|
||||
if last_block.is_some() {
|
||||
if let Some(last) = last_block {
|
||||
let provider_rw = self.provider.database_provider_rw()?;
|
||||
provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?;
|
||||
|
||||
// Commit pending finalized/safe block updates in the same transaction
|
||||
if let Some(finalized) = pending_finalized {
|
||||
provider_rw.save_finalized_block_number(finalized)?;
|
||||
}
|
||||
@@ -183,6 +164,13 @@ where
|
||||
provider_rw.save_safe_block_number(safe)?;
|
||||
}
|
||||
|
||||
if self.pruner.is_pruning_needed(last.number) {
|
||||
debug!(target: "engine::persistence", block_num=?last.number, "Running pruner");
|
||||
let prune_start = Instant::now();
|
||||
let _ = self.pruner.run_with_provider(&provider_rw, last.number)?;
|
||||
self.metrics.prune_before_duration_seconds.record(prune_start.elapsed());
|
||||
}
|
||||
|
||||
provider_rw.commit()?;
|
||||
}
|
||||
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user