Compare commits

..

3 Commits

Author SHA1 Message Date
yongkangc
aab08b82ea perf: use synchronous state root for small blocks
Amp-Thread-ID: https://ampcode.com/threads/T-019c649c-0e7d-74ce-9e41-be800680229a
2026-02-16 04:06:31 +00:00
yongkangc
6d357545ef perf: inline receipt root for small payloads
Amp-Thread-ID: https://ampcode.com/threads/T-019c6479-e793-7149-9d33-d7cb75d0adc1
2026-02-16 03:44:51 +00:00
yongkangc
2740793e42 perf(engine): reduce save_cache write lock hold time
Move insert_state, update_metrics, and valid_block_rx.recv() outside
the PayloadExecutionCache write lock. A short take() under the lock
first detaches the published cache so readers see None during the
update window, preventing observation of partially-mutated Arc-shared
ExecutionCache state.

Amp-Thread-ID: https://ampcode.com/threads/T-019c42d4-fff9-727c-8256-eb3684deaba4
2026-02-16 02:48:53 +00:00
477 changed files with 14901 additions and 17082 deletions

View File

@@ -1,10 +0,0 @@
---
reth-engine-primitives: patch
reth-engine-tree: patch
reth-node-core: patch
reth-trie-parallel: minor
---
Removed legacy proof calculation system and V2-specific configuration flags.
Removed the legacy (non-V2) proof calculation code paths, simplified multiproof task architecture by removing the dual-mode system, and cleaned up V2-specific CLI flags (`--engine.disable-proof-v2`, `--engine.disable-trie-cache`) that are no longer needed. The codebase now exclusively uses V2 proofs with the sparse trie cache.

View File

@@ -1,5 +0,0 @@
---
reth-trie-sparse: patch
---
Refactored sparse trie node state tracking to use RLP nodes instead of hashes. Replaced `Option<B256>` hash fields with `SparseNodeState` enum that tracks either dirty nodes or cached RLP nodes with optional database storage flags. Added debug assertions to validate leaf path lengths and improved pruning logic to use node paths directly instead of path-hash tuples.

View File

@@ -1,5 +0,0 @@
---
reth-transaction-pool: minor
---
Added support for optional custom stateless and stateful validation hooks in `EthTransactionValidator` via `set_additional_stateless_validation` and `set_additional_stateful_validation` methods. Also implemented a manual `Debug` impl to handle the non-`Debug` function pointer fields.

View File

@@ -1,5 +0,0 @@
---
reth-trie-sparse: patch
---
Added recording of `SetRoot` operation in `ParallelSparseTrie::set_root` when the `trie-debug` feature is enabled.

View File

@@ -1,6 +0,0 @@
---
reth-rpc-convert: minor
reth-storage-rpc-provider: minor
---
Replaced the separate `TryFromBlockResponse`, `TryFromReceiptResponse`, and `TryFromTransactionResponse` traits with a unified `RpcResponseConverter` trait and default `EthRpcConverter` implementation. Removed the `op-alloy-network` dependency and refactored `RpcBlockchainProvider` to store a dynamic converter instance instead of relying on per-type trait bounds.

View File

@@ -1,5 +0,0 @@
---
reth-rpc-eth-types: patch
---
Updated `eth_simulateV1` revert error code from `-32000` to `3` to be consistent with `eth_call`, per [execution-apis#748](https://github.com/ethereum/execution-apis/pull/748).

View File

@@ -1,5 +0,0 @@
---
reth-engine-tree: patch
---
Added sub-phase timing histograms to the sparse trie event loop, tracking channel wait, proof coalescing, multiproof reveal, and trie update durations separately.

View File

@@ -1,5 +0,0 @@
---
reth-engine-tree: patch
---
Fixed `compare_trie_updates` to return `bool` indicating whether differences were found, and updated the caller to properly use the return value instead of treating all successful comparisons as having no differences.

View File

@@ -1,5 +0,0 @@
---
reth-node-core: minor
---
Added `with_dev_block_time` helper method to `NodeConfig` for configuring dev miner block production interval.

View File

@@ -1,5 +0,0 @@
---
reth-db-api: patch
---
Changed `StoredNibblesSubKey` encoding to use a stack-allocated `[u8; 65]` array instead of a heap-allocated `Vec<u8>`, avoiding unnecessary heap allocation.

View File

@@ -1,8 +0,0 @@
---
reth: patch
reth-engine-tree: patch
reth-node-builder: patch
reth-trie-sparse: minor
---
Added `trie-debug` feature for recording sparse trie mutations to aid in debugging state root mismatches.

View File

@@ -1,5 +0,0 @@
---
reth-provider: patch
---
Fixed sender pruning during block reorg to skip when sender_recovery is fully pruned, preventing a fatal crash when no sender data exists in static files.

View File

@@ -1,7 +0,0 @@
---
reth-network-types: minor
reth-network: minor
reth-node-core: patch
---
Added `PersistedPeerInfo` struct to persist richer peer metadata (kind, fork ID, reputation) to disk. Updated `PeersConfig::with_basic_nodes_from_file` to support both the new `PersistedPeerInfo` format and the legacy `Vec<NodeRecord>` format with automatic conversion, and updated `write_peers_to_file` to exclude backed-off and banned peers.

View File

@@ -1,5 +0,0 @@
---
reth-network: minor
---
Added `fork_id` as a tiebreaker in peer selection when reputations are equal, preferring peers with a discovered `fork_id` as it indicates fork compatibility. Added a test to verify the tiebreaker behavior.

View File

@@ -1,5 +0,0 @@
---
reth-trie-sparse: patch
---
Fixed a bug where trie nodes could appear in both `updated_nodes` and `removed_nodes` simultaneously by removing entries from `removed_nodes` when a node is inserted as updated.

View File

@@ -1,6 +0,0 @@
---
reth-engine-local: minor
reth-node-builder: minor
---
Added trigger-based `MiningMode` variant that allows blocks to be built on-demand via custom streams, and exposed `with_mining_mode` method on `DebugNodeLauncherFuture` to override default mining configuration.

View File

@@ -1,5 +0,0 @@
---
reth-transaction-pool: patch
---
Fixed a bug where transactions from the same sender were added to the pending subpool out of nonce order. Ensured `process_updates` runs before `add_new_transaction` so that lower-nonce promotions are enqueued before the newly inserted higher-nonce transaction, preserving correct ordering for live `BestTransactions` iterators.

View File

@@ -1,6 +1,6 @@
[profile.default]
retries = { backoff = "exponential", count = 2, delay = "2s", jitter = true }
slow-timeout = { period = "30s", terminate-after = 2 }
slow-timeout = { period = "30s", terminate-after = 4 }
[[profile.default.overrides]]
filter = "test(general_state_tests)"

2
.github/CODEOWNERS vendored
View File

@@ -1,7 +1,7 @@
* @gakonst
crates/chain-state/ @fgimenez @mattsse
crates/chainspec/ @Rjected @joshieDo @mattsse
crates/cli/ @mattsse @Rjected
crates/cli/ @mattsse
crates/config/ @shekhirin @mattsse @Rjected
crates/consensus/ @mattsse @Rjected
crates/e2e-test-utils/ @mattsse @Rjected @klkvr @fgimenez

View File

@@ -1,99 +0,0 @@
#!/usr/bin/env bash
#
# Builds (or fetches from cache) reth binaries for benchmarking.
#
# Usage: bench-reth-build.sh <baseline|feature> <source-dir> <commit> [branch-sha]
#
# baseline — build/fetch the baseline binary at <commit> (merge-base)
# source-dir must be checked out at <commit>
# feature — build/fetch the candidate binary + reth-bench at <commit>
# source-dir must be checked out at <commit>
# optional branch-sha is the PR head commit for cache key
#
# Outputs:
# baseline: <source-dir>/target/profiling/reth
# feature: <source-dir>/target/profiling/reth, reth-bench installed to cargo bin
#
# Required: mc (MinIO client) with a configured alias
set -euo pipefail
MC="mc"
MODE="$1"
SOURCE_DIR="$2"
COMMIT="$3"
# Verify a cached reth binary was built from the expected commit.
# `reth --version` outputs "Commit SHA: <full-sha>" on its own line.
verify_binary() {
local binary="$1" expected_commit="$2"
local version binary_sha
version=$("$binary" --version 2>/dev/null) || return 1
binary_sha=$(echo "$version" | sed -n 's/^Commit SHA: *//p')
if [ -z "$binary_sha" ]; then
echo "Warning: could not extract commit SHA from version output"
return 1
fi
if [ "$binary_sha" = "$expected_commit" ]; then
return 0
fi
echo "Cache mismatch: binary built from ${binary_sha} but expected ${expected_commit}"
return 1
}
case "$MODE" in
baseline|main)
BUCKET="minio/reth-binaries/${COMMIT}"
mkdir -p "${SOURCE_DIR}/target/profiling"
CACHE_VALID=false
if $MC stat "${BUCKET}/reth" &>/dev/null; then
echo "Cache hit for baseline (${COMMIT}), downloading binary..."
$MC cp "${BUCKET}/reth" "${SOURCE_DIR}/target/profiling/reth"
chmod +x "${SOURCE_DIR}/target/profiling/reth"
if verify_binary "${SOURCE_DIR}/target/profiling/reth" "${COMMIT}"; then
CACHE_VALID=true
else
echo "Cached baseline binary is stale, rebuilding..."
fi
fi
if [ "$CACHE_VALID" = false ]; then
echo "Building baseline (${COMMIT}) from source..."
cd "${SOURCE_DIR}"
cargo build --profile profiling --bin reth
$MC cp target/profiling/reth "${BUCKET}/reth"
fi
;;
feature|branch)
BRANCH_SHA="${4:-$COMMIT}"
BUCKET="minio/reth-binaries/${BRANCH_SHA}"
CACHE_VALID=false
if $MC stat "${BUCKET}/reth" &>/dev/null && $MC stat "${BUCKET}/reth-bench" &>/dev/null; then
echo "Cache hit for ${BRANCH_SHA}, downloading binaries..."
mkdir -p "${SOURCE_DIR}/target/profiling"
$MC cp "${BUCKET}/reth" "${SOURCE_DIR}/target/profiling/reth"
$MC cp "${BUCKET}/reth-bench" /home/ubuntu/.cargo/bin/reth-bench
chmod +x "${SOURCE_DIR}/target/profiling/reth" /home/ubuntu/.cargo/bin/reth-bench
if verify_binary "${SOURCE_DIR}/target/profiling/reth" "${COMMIT}"; then
CACHE_VALID=true
else
echo "Cached feature binary is stale, rebuilding..."
fi
fi
if [ "$CACHE_VALID" = false ]; then
echo "Building feature (${COMMIT}) from source..."
cd "${SOURCE_DIR}"
rustup show active-toolchain || rustup default stable
make profiling
make install-reth-bench
$MC cp target/profiling/reth "${BUCKET}/reth"
$MC cp "$(which reth-bench)" "${BUCKET}/reth-bench"
fi
;;
*)
echo "Usage: $0 <baseline|feature> <source-dir> <commit> [branch-sha]"
exit 1
;;
esac

View File

@@ -1,260 +0,0 @@
#!/usr/bin/env python3
"""Generate benchmark charts from reth-bench CSV output.
Usage:
bench-engine-charts.py <combined_csv> --output-dir <dir> [--baseline <baseline_csv>]
Generates three PNG charts:
1. newPayload latency + Ggas/s per block (+ latency diff when baseline present)
2. Wait breakdown (persistence, execution cache, sparse trie) per block
3. Scatter plot of gas used vs latency
When --baseline is provided, charts overlay both datasets for comparison.
"""
import argparse
import csv
import sys
from pathlib import Path
import numpy as np
try:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
except ImportError:
print("matplotlib is required: pip install matplotlib", file=sys.stderr)
sys.exit(1)
GIGAGAS = 1_000_000_000
def parse_combined_csv(path: str) -> list[dict]:
rows = []
with open(path) as f:
reader = csv.DictReader(f)
for row in reader:
rows.append(
{
"block_number": int(row["block_number"]),
"gas_used": int(row["gas_used"]),
"new_payload_latency_us": int(row["new_payload_latency"]),
"persistence_wait_us": int(row["persistence_wait"])
if row.get("persistence_wait")
else None,
"execution_cache_wait_us": int(row.get("execution_cache_wait", 0)),
"sparse_trie_wait_us": int(row.get("sparse_trie_wait", 0)),
}
)
return rows
def plot_latency_and_throughput(
feature: list[dict], baseline: list[dict] | None, out: Path,
baseline_name: str = "baseline", feature_name: str = "feature",
):
num_plots = 3 if baseline else 2
fig, axes = plt.subplots(num_plots, 1, figsize=(12, 4 * num_plots), sharex=True)
ax1, ax2 = axes[0], axes[1]
feat_x = [r["block_number"] for r in feature]
feat_lat = [r["new_payload_latency_us"] / 1_000 for r in feature]
feat_ggas = []
for r in feature:
lat_s = r["new_payload_latency_us"] / 1_000_000
feat_ggas.append(r["gas_used"] / lat_s / GIGAGAS if lat_s > 0 else 0)
if baseline:
base_x = [r["block_number"] for r in baseline]
base_lat = [r["new_payload_latency_us"] / 1_000 for r in baseline]
base_ggas = []
for r in baseline:
lat_s = r["new_payload_latency_us"] / 1_000_000
base_ggas.append(r["gas_used"] / lat_s / GIGAGAS if lat_s > 0 else 0)
l, = ax1.plot(base_x, base_lat, linewidth=0.8, label=baseline_name, alpha=0.7)
ax1.axhline(np.median(base_lat), color=l.get_color(), linestyle="--", linewidth=1, alpha=0.7, label=f"{baseline_name} median")
l, = ax2.plot(base_x, base_ggas, linewidth=0.8, label=baseline_name, alpha=0.7)
ax2.axhline(np.median(base_ggas), color=l.get_color(), linestyle="--", linewidth=1, alpha=0.7, label=f"{baseline_name} median")
l, = ax1.plot(feat_x, feat_lat, linewidth=0.8, label=feature_name)
ax1.axhline(np.median(feat_lat), color=l.get_color(), linestyle="--", linewidth=1, label=f"{feature_name} median")
ax1.set_ylabel("Latency (ms)")
ax1.set_title("newPayload Latency per Block")
ax1.grid(True, alpha=0.3)
ax1.legend()
l, = ax2.plot(feat_x, feat_ggas, linewidth=0.8, label=feature_name)
ax2.axhline(np.median(feat_ggas), color=l.get_color(), linestyle="--", linewidth=1, label=f"{feature_name} median")
ax2.set_ylabel("Ggas/s")
ax2.set_title("Execution Throughput per Block")
ax2.grid(True, alpha=0.3)
ax2.legend()
if baseline:
ax3 = axes[2]
base_by_block = {r["block_number"]: r["new_payload_latency_us"] for r in baseline}
blocks, diffs = [], []
for r in feature:
bn = r["block_number"]
if bn in base_by_block and base_by_block[bn] > 0:
pct = (r["new_payload_latency_us"] - base_by_block[bn]) / base_by_block[bn] * 100
blocks.append(bn)
diffs.append(pct)
if blocks:
colors = ["green" if d <= 0 else "red" for d in diffs]
ax3.bar(blocks, diffs, width=1.0, color=colors, alpha=0.7, edgecolor="none")
ax3.axhline(0, color="black", linewidth=0.5)
ax3.set_ylabel("Δ Latency (%)")
ax3.set_title("Per-Block newPayload Latency Change (feature vs baseline)")
ax3.grid(True, alpha=0.3, axis="y")
axes[-1].set_xlabel("Block Number")
fig.tight_layout()
fig.savefig(out, dpi=150)
plt.close(fig)
def plot_wait_breakdown(
feature: list[dict], baseline: list[dict] | None, out: Path,
baseline_name: str = "baseline", feature_name: str = "feature",
):
series = [
("Persistence Wait", "persistence_wait_us"),
("State Cache Wait", "execution_cache_wait_us"),
("Trie Cache Wait", "sparse_trie_wait_us"),
]
fig, axes = plt.subplots(len(series), 1, figsize=(12, 3 * len(series)), sharex=True)
for ax, (label, key) in zip(axes, series):
if baseline:
bx = [r["block_number"] for r in baseline if r[key] is not None]
by = [r[key] / 1_000 for r in baseline if r[key] is not None]
if bx:
ax.plot(bx, by, linewidth=0.8, label=baseline_name, alpha=0.7)
fx = [r["block_number"] for r in feature if r[key] is not None]
fy = [r[key] / 1_000 for r in feature if r[key] is not None]
if fx:
ax.plot(fx, fy, linewidth=0.8, label=feature_name)
ax.set_ylabel("ms")
ax.set_title(label)
ax.grid(True, alpha=0.3)
if baseline:
ax.legend()
axes[-1].set_xlabel("Block Number")
fig.suptitle("Wait Time Breakdown per Block", fontsize=14, y=1.01)
fig.tight_layout()
fig.savefig(out, dpi=150, bbox_inches="tight")
plt.close(fig)
def _add_regression(ax, x, y, color, label):
"""Add a linear regression line to the axes."""
if len(x) < 2:
return
xa, ya = np.array(x), np.array(y)
m, b = np.polyfit(xa, ya, 1)
x_range = np.linspace(xa.min(), xa.max(), 100)
ax.plot(x_range, m * x_range + b, color=color, linewidth=1.5, alpha=0.8,
label=label)
def plot_gas_vs_latency(
feature: list[dict], baseline: list[dict] | None, out: Path,
baseline_name: str = "baseline", feature_name: str = "feature",
):
fig, ax = plt.subplots(figsize=(8, 6))
if baseline:
bgas = [r["gas_used"] / 1_000_000 for r in baseline]
blat = [r["new_payload_latency_us"] / 1_000 for r in baseline]
ax.scatter(bgas, blat, s=8, alpha=0.5)
_add_regression(ax, bgas, blat, "tab:blue", baseline_name)
fgas = [r["gas_used"] / 1_000_000 for r in feature]
flat = [r["new_payload_latency_us"] / 1_000 for r in feature]
ax.scatter(fgas, flat, s=8, alpha=0.6)
_add_regression(ax, fgas, flat, "tab:orange", feature_name)
ax.set_xlabel("Gas Used (Mgas)")
ax.set_ylabel("newPayload Latency (ms)")
ax.set_title("Gas Used vs Latency")
ax.grid(True, alpha=0.3)
ax.legend()
fig.tight_layout()
fig.savefig(out, dpi=150)
plt.close(fig)
def merge_csvs(paths: list[str]) -> list[dict]:
"""Parse and merge multiple CSVs, averaging values for duplicate blocks."""
by_block: dict[int, list[dict]] = {}
for path in paths:
for row in parse_combined_csv(path):
by_block.setdefault(row["block_number"], []).append(row)
merged = []
for bn in sorted(by_block):
rows = by_block[bn]
if len(rows) == 1:
merged.append(rows[0])
else:
avg = {"block_number": bn}
for key in ("gas_used", "new_payload_latency_us"):
avg[key] = int(sum(r[key] for r in rows) / len(rows))
for key in ("persistence_wait_us", "execution_cache_wait_us", "sparse_trie_wait_us"):
vals = [r[key] for r in rows if r[key] is not None]
avg[key] = int(sum(vals) / len(vals)) if vals else None
merged.append(avg)
return merged
def main():
parser = argparse.ArgumentParser(description="Generate benchmark charts")
parser.add_argument(
"--feature", nargs="+", required=True,
help="Path(s) to feature combined_latency.csv",
)
parser.add_argument(
"--output-dir", required=True, help="Output directory for PNG charts"
)
parser.add_argument(
"--baseline", nargs="+", help="Path(s) to baseline combined_latency.csv"
)
parser.add_argument("--baseline-name", default="baseline", help="Label for baseline")
parser.add_argument("--feature-name", "--branch-name", default="feature", help="Label for feature")
args = parser.parse_args()
feature = merge_csvs(args.feature)
if not feature:
print("No results found in feature CSV(s)", file=sys.stderr)
sys.exit(1)
baseline = None
if args.baseline:
baseline = merge_csvs(args.baseline)
if not baseline:
print(
"Warning: no results in baseline CSV(s), skipping comparison",
file=sys.stderr,
)
baseline = None
out_dir = Path(args.output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
bname = args.baseline_name
fname = args.feature_name
plot_latency_and_throughput(feature, baseline, out_dir / "latency_throughput.png", bname, fname)
plot_wait_breakdown(feature, baseline, out_dir / "wait_breakdown.png", bname, fname)
plot_gas_vs_latency(feature, baseline, out_dir / "gas_vs_latency.png", bname, fname)
print(f"Charts written to {out_dir}")
if __name__ == "__main__":
main()

View File

@@ -1,144 +0,0 @@
#!/usr/bin/env bash
#
# Runs a single reth-bench cycle: mount snapshot → start node → warmup →
# benchmark → stop node → recover snapshot.
#
# Usage: bench-reth-run.sh <label> <binary> <output-dir>
#
# Required env: SCHELK_MOUNT, BENCH_RPC_URL, BENCH_BLOCKS, BENCH_WARMUP_BLOCKS
set -euo pipefail
LABEL="$1"
BINARY="$2"
OUTPUT_DIR="$3"
DATADIR="$SCHELK_MOUNT/datadir"
mkdir -p "$OUTPUT_DIR"
LOG="${OUTPUT_DIR}/node.log"
cleanup() {
kill "$TAIL_PID" 2>/dev/null || true
if [ -n "${RETH_PID:-}" ] && sudo kill -0 "$RETH_PID" 2>/dev/null; then
if [ "${BENCH_SAMPLY:-false}" = "true" ]; then
# Send SIGINT to the inner reth process by exact name (not -f which
# would also match samply's cmdline containing "reth"). Samply will
# capture reth's exit and save the profile.
sudo pkill -INT -x reth 2>/dev/null || true
# Wait for samply to finish writing the profile and exit
for i in $(seq 1 120); do
sudo pgrep -x samply > /dev/null 2>&1 || break
if [ $((i % 10)) -eq 0 ]; then
echo "Waiting for samply to finish writing profile... (${i}s)"
fi
sleep 1
done
if sudo pgrep -x samply > /dev/null 2>&1; then
echo "Samply still running after 120s, sending SIGTERM..."
sudo pkill -x samply 2>/dev/null || true
fi
else
sudo kill "$RETH_PID"
for i in $(seq 1 30); do
sudo kill -0 "$RETH_PID" 2>/dev/null || break
sleep 1
done
fi
sudo kill -9 "$RETH_PID" 2>/dev/null || true
sleep 1
fi
# Fix ownership of reth-created files (reth runs as root)
sudo chown -R "$(id -un):$(id -gn)" "$OUTPUT_DIR" 2>/dev/null || true
if mountpoint -q "$SCHELK_MOUNT"; then
sudo umount -l "$SCHELK_MOUNT" || true
sudo schelk recover -y || true
fi
}
TAIL_PID=
trap cleanup EXIT
# Mount
sudo schelk mount -y
sync
sudo sh -c 'echo 3 > /proc/sys/vm/drop_caches'
echo "=== Cache state after drop ==="
free -h
grep Cached /proc/meminfo
# Start reth
# CPU layout: core 0 = OS/IRQs/reth-bench/aux, cores 1+ = reth node
RETH_BENCH="$(which reth-bench)"
ONLINE=$(nproc --all)
MAX_RETH=$(( ONLINE - 1 ))
if [ "${BENCH_CORES:-0}" -gt 0 ] && [ "$BENCH_CORES" -lt "$MAX_RETH" ]; then
MAX_RETH=$BENCH_CORES
fi
RETH_CPUS="1-${MAX_RETH}"
RETH_ARGS=(
node
--datadir "$DATADIR"
--log.file.directory "$OUTPUT_DIR/reth-logs"
--engine.accept-execution-requests-hash
--http
--http.port 8545
--ws
--ws.api all
--authrpc.port 8551
--disable-discovery
--no-persist-peers
)
if [ "${BENCH_SAMPLY:-false}" = "true" ]; then
RETH_ARGS+=(--log.samply)
SAMPLY="$(which samply)"
sudo taskset -c "$RETH_CPUS" nice -n -20 \
"$SAMPLY" record --save-only --presymbolicate --rate 10000 \
--output "$OUTPUT_DIR/samply-profile.json.gz" \
-- "$BINARY" "${RETH_ARGS[@]}" \
> "$LOG" 2>&1 &
else
sudo taskset -c "$RETH_CPUS" nice -n -20 "$BINARY" "${RETH_ARGS[@]}" \
> "$LOG" 2>&1 &
fi
RETH_PID=$!
stdbuf -oL tail -f "$LOG" | sed -u "s/^/[reth] /" &
TAIL_PID=$!
for i in $(seq 1 60); do
if curl -sf http://127.0.0.1:8545 -X POST \
-H 'Content-Type: application/json' \
-d '{"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":1}' \
> /dev/null 2>&1; then
echo "reth (${LABEL}) is ready after ${i}s"
break
fi
if [ "$i" -eq 60 ]; then
echo "::error::reth (${LABEL}) failed to start within 60s"
cat "$LOG"
exit 1
fi
sleep 1
done
# Run reth-bench with high priority but as the current user so output
# files are not root-owned (avoids EACCES on next checkout).
BENCH_NICE="sudo nice -n -20 sudo -u $(id -un)"
# Warmup
$BENCH_NICE "$RETH_BENCH" new-payload-fcu \
--rpc-url "$BENCH_RPC_URL" \
--engine-rpc-url http://127.0.0.1:8551 \
--jwt-secret "$DATADIR/jwt.hex" \
--advance "${BENCH_WARMUP_BLOCKS:-50}" \
--reth-new-payload 2>&1 | sed -u "s/^/[bench] /"
# Benchmark
$BENCH_NICE "$RETH_BENCH" new-payload-fcu \
--rpc-url "$BENCH_RPC_URL" \
--engine-rpc-url http://127.0.0.1:8551 \
--jwt-secret "$DATADIR/jwt.hex" \
--advance "$BENCH_BLOCKS" \
--reth-new-payload \
--output "$OUTPUT_DIR" 2>&1 | sed -u "s/^/[bench] /"
# cleanup runs via trap

View File

@@ -1,127 +0,0 @@
#!/usr/bin/env bash
#
# Downloads the latest nightly snapshot into the schelk volume with
# progress reporting to the GitHub PR comment.
#
# Skips the download if the local ETag marker matches the remote one.
#
# Usage: bench-reth-snapshot.sh [--check]
# --check Only check if a download is needed; exits 0 if up-to-date, 1 if not.
#
# Required env:
# SCHELK_MOUNT schelk mount point (e.g. /reth-bench)
# GITHUB_TOKEN token for GitHub API calls (only for download)
# BENCH_COMMENT_ID PR comment ID to update (optional)
# BENCH_REPO owner/repo (e.g. paradigmxyz/reth)
# BENCH_JOB_URL link to the Actions job
# BENCH_ACTOR user who triggered the benchmark
# BENCH_CONFIG config summary line
set -euo pipefail
BUCKET="minio/reth-snapshots/reth-1-minimal-nightly-previous.tar.zst"
DATADIR="$SCHELK_MOUNT/datadir"
ETAG_FILE="$HOME/.reth-bench-snapshot-etag"
# Get remote metadata via JSON for reliable parsing
MC_STAT=$(mc stat --json "$BUCKET" 2>/dev/null || true)
REMOTE_ETAG=$(echo "$MC_STAT" | jq -r '.etag // empty')
if [ -z "$REMOTE_ETAG" ]; then
echo "::warning::Failed to get ETag from mc stat, will re-download"
REMOTE_ETAG="unknown-$(date +%s)"
fi
LOCAL_ETAG=""
[ -f "$ETAG_FILE" ] && LOCAL_ETAG=$(cat "$ETAG_FILE")
if [ "$REMOTE_ETAG" = "$LOCAL_ETAG" ]; then
echo "Snapshot is up-to-date (ETag: ${REMOTE_ETAG})"
if [ "${1:-}" = "--check" ]; then
exit 0
fi
exit 0
fi
echo "Snapshot needs update (local: ${LOCAL_ETAG:-<none>}, remote: ${REMOTE_ETAG})"
if [ "${1:-}" = "--check" ]; then
exit 1
fi
# Get compressed size for progress tracking
TOTAL_BYTES=$(echo "$MC_STAT" | jq -r '.size // empty')
if [ -z "$TOTAL_BYTES" ] || [ "$TOTAL_BYTES" = "0" ]; then
echo "::error::Failed to get snapshot size from mc stat"
exit 1
fi
echo "Snapshot size: $TOTAL_BYTES bytes ($(numfmt --to=iec "$TOTAL_BYTES"))"
# Prepare mount
mountpoint -q "$SCHELK_MOUNT" && sudo schelk recover -y || true
sudo schelk mount -y
sudo rm -rf "$DATADIR"
sudo mkdir -p "$DATADIR"
update_comment() {
local pct="$1"
[ -z "${BENCH_COMMENT_ID:-}" ] && return 0
local status="Building binaries & downloading snapshot… ${pct}%"
local body
body="$(printf 'cc @%s\n\n🚀 Benchmark started! [View job](%s)\n\n⏳ **Status:** %s\n\n%s' \
"$BENCH_ACTOR" "$BENCH_JOB_URL" "$status" "$BENCH_CONFIG")"
curl -sf -X PATCH \
-H "Authorization: token ${GITHUB_TOKEN}" \
-H "Accept: application/vnd.github.v3+json" \
"https://api.github.com/repos/${BENCH_REPO}/issues/comments/${BENCH_COMMENT_ID}" \
-d "$(jq -nc --arg body "$body" '{body: $body}')" \
> /dev/null 2>&1 || true
}
# Track compressed bytes flowing through the pipe
DL_BYTES_FILE=$(mktemp)
echo 0 > "$DL_BYTES_FILE"
# Start progress reporter in background
(
while true; do
sleep 10
CURRENT=$(cat "$DL_BYTES_FILE" 2>/dev/null || echo 0)
if [ "$TOTAL_BYTES" -gt 0 ]; then
PCT=$(( CURRENT * 100 / TOTAL_BYTES ))
[ "$PCT" -gt 100 ] && PCT=100
echo "Snapshot download: $(numfmt --to=iec "$CURRENT") / $(numfmt --to=iec "$TOTAL_BYTES") (${PCT}%)"
update_comment "$PCT"
fi
done
) &
PROGRESS_PID=$!
trap 'kill $PROGRESS_PID 2>/dev/null || true; rm -f "$DL_BYTES_FILE"' EXIT
# Download and extract; python byte counter tracks compressed bytes received
mc cat "$BUCKET" | python3 -c "
import sys
count = 0
while True:
data = sys.stdin.buffer.read(1048576)
if not data:
break
count += len(data)
sys.stdout.buffer.write(data)
with open('$DL_BYTES_FILE', 'w') as f:
f.write(str(count))
" | pzstd -d -p 6 | sudo tar -xf - -C "$DATADIR"
# Stop progress reporter
kill $PROGRESS_PID 2>/dev/null || true
wait $PROGRESS_PID 2>/dev/null || true
update_comment "100"
echo "Snapshot download complete"
# Promote the new snapshot to become the schelk baseline (virgin volume).
# This copies changed blocks from scratch → virgin so that future
# `schelk recover` calls restore to this new state.
sync
sudo schelk promote -y
# Save ETag marker
echo "$REMOTE_ETAG" > "$ETAG_FILE"
echo "Snapshot promoted to schelk baseline (ETag: ${REMOTE_ETAG})"

View File

@@ -1,553 +0,0 @@
#!/usr/bin/env python3
"""Parse reth-bench CSV output and generate a summary JSON + markdown comparison.
Usage:
bench-reth-summary.py <combined_csv> <gas_csv> \
--output-summary <summary.json> \
--output-markdown <comment.md> \
--baseline-csv <baseline_combined.csv> \
[--repo <owner/repo>] \
[--baseline-ref <sha>] \
[--feature-name <name>] \
[--feature-sha <sha>]
Generates a paired statistical comparison between baseline and feature.
Matches blocks by number and computes per-block diffs to cancel out gas
variance. Fails if baseline or feature CSV is missing or empty.
"""
import argparse
import csv
import json
import math
import random
import sys
GIGAGAS = 1_000_000_000
T_CRITICAL = 1.96 # two-tailed 95% confidence
BOOTSTRAP_ITERATIONS = 10_000
def _opt_int(row: dict, key: str) -> int | None:
"""Return int value for a CSV field, or None if missing/empty."""
v = row.get(key)
if v is None or v == "":
return None
return int(v)
def parse_combined_csv(path: str) -> list[dict]:
"""Parse combined_latency.csv into a list of per-block dicts."""
rows = []
with open(path) as f:
reader = csv.DictReader(f)
for row in reader:
rows.append(
{
"block_number": int(row["block_number"]),
"gas_used": int(row["gas_used"]),
"gas_limit": int(row["gas_limit"]),
"transaction_count": int(row["transaction_count"]),
"new_payload_latency_us": int(row["new_payload_latency"]),
"fcu_latency_us": int(row["fcu_latency"]),
"total_latency_us": int(row["total_latency"]),
"persistence_wait_us": _opt_int(row, "persistence_wait"),
"execution_cache_wait_us": _opt_int(row, "execution_cache_wait"),
"sparse_trie_wait_us": _opt_int(row, "sparse_trie_wait"),
}
)
return rows
def parse_gas_csv(path: str) -> list[dict]:
"""Parse total_gas.csv into a list of per-block dicts."""
rows = []
with open(path) as f:
reader = csv.DictReader(f)
for row in reader:
rows.append(
{
"block_number": int(row["block_number"]),
"gas_used": int(row["gas_used"]),
"time_us": int(row["time"]),
}
)
return rows
def stddev(values: list[float], mean: float) -> float:
if len(values) < 2:
return 0.0
return math.sqrt(sum((v - mean) ** 2 for v in values) / (len(values) - 1))
def percentile(sorted_vals: list[float], pct: int) -> float:
if not sorted_vals:
return 0.0
idx = int(len(sorted_vals) * pct / 100)
idx = min(idx, len(sorted_vals) - 1)
return sorted_vals[idx]
def compute_stats(combined: list[dict]) -> dict:
"""Compute per-run statistics from parsed CSV data."""
n = len(combined)
if n == 0:
return {}
latencies_ms = [r["new_payload_latency_us"] / 1_000 for r in combined]
sorted_lat = sorted(latencies_ms)
mean_lat = sum(latencies_ms) / n
std_lat = stddev(latencies_ms, mean_lat)
mgas_s_values = []
for r in combined:
lat_s = r["new_payload_latency_us"] / 1_000_000
if lat_s > 0:
mgas_s_values.append(r["gas_used"] / lat_s / 1_000_000)
mean_mgas_s = sum(mgas_s_values) / len(mgas_s_values) if mgas_s_values else 0
return {
"n": n,
"mean_ms": mean_lat,
"stddev_ms": std_lat,
"p50_ms": percentile(sorted_lat, 50),
"p90_ms": percentile(sorted_lat, 90),
"p99_ms": percentile(sorted_lat, 99),
"mean_mgas_s": mean_mgas_s,
}
def compute_wait_stats(combined: list[dict], field: str) -> dict:
"""Compute mean/p50/p95 for a wait time field (in ms)."""
values_ms = []
for r in combined:
v = r.get(field)
if v is not None:
values_ms.append(v / 1_000)
if not values_ms:
return {}
n = len(values_ms)
mean_val = sum(values_ms) / n
sorted_vals = sorted(values_ms)
return {
"mean_ms": mean_val,
"p50_ms": percentile(sorted_vals, 50),
"p95_ms": percentile(sorted_vals, 95),
}
def _paired_data(
baseline: list[dict], feature: list[dict]
) -> tuple[list[tuple[float, float]], list[float], list[float]]:
"""Match blocks and return paired latencies and per-block diffs.
Returns:
pairs: list of (baseline_ms, feature_ms) tuples
lat_diffs_ms: list of feature baseline latency diffs in ms
mgas_diffs: list of feature baseline Mgas/s diffs
"""
baseline_by_block = {r["block_number"]: r for r in baseline}
feature_by_block = {r["block_number"]: r for r in feature}
common_blocks = sorted(set(baseline_by_block) & set(feature_by_block))
pairs = []
lat_diffs_ms = []
mgas_diffs = []
for bn in common_blocks:
b = baseline_by_block[bn]
f = feature_by_block[bn]
b_ms = b["new_payload_latency_us"] / 1_000
f_ms = f["new_payload_latency_us"] / 1_000
pairs.append((b_ms, f_ms))
lat_diffs_ms.append(f_ms - b_ms)
b_lat_s = b["new_payload_latency_us"] / 1_000_000
f_lat_s = f["new_payload_latency_us"] / 1_000_000
if b_lat_s > 0 and f_lat_s > 0:
mgas_diffs.append(
f["gas_used"] / f_lat_s / 1_000_000
- b["gas_used"] / b_lat_s / 1_000_000
)
return pairs, lat_diffs_ms, mgas_diffs
def compute_paired_stats(
baseline_runs: list[list[dict]],
feature_runs: list[list[dict]],
) -> dict:
"""Compute paired statistics between baseline and feature runs.
Each pair (baseline_runs[i], feature_runs[i]) produces per-block diffs.
All diffs are pooled for the final CI.
"""
all_pairs = []
all_lat_diffs = []
all_mgas_diffs = []
blocks_per_pair = []
for baseline, feature in zip(baseline_runs, feature_runs):
pairs, lat_diffs, mgas_diffs = _paired_data(baseline, feature)
all_pairs.extend(pairs)
all_lat_diffs.extend(lat_diffs)
all_mgas_diffs.extend(mgas_diffs)
blocks_per_pair.append(len(pairs))
if not all_lat_diffs:
return {}
n = len(all_lat_diffs)
mean_diff = sum(all_lat_diffs) / n
std_diff = stddev(all_lat_diffs, mean_diff)
se = std_diff / math.sqrt(n) if n > 0 else 0.0
ci = T_CRITICAL * se
# Bootstrap CI on difference-of-percentiles (resample paired blocks)
base_lats = sorted([p[0] for p in all_pairs])
feature_lats = sorted([p[1] for p in all_pairs])
p50_diff = percentile(feature_lats, 50) - percentile(base_lats, 50)
p90_diff = percentile(feature_lats, 90) - percentile(base_lats, 90)
p99_diff = percentile(feature_lats, 99) - percentile(base_lats, 99)
rng = random.Random(42)
p50_boot, p90_boot, p99_boot = [], [], []
for _ in range(BOOTSTRAP_ITERATIONS):
sample = rng.choices(all_pairs, k=n)
b_sorted = sorted(p[0] for p in sample)
f_sorted = sorted(p[1] for p in sample)
p50_boot.append(percentile(f_sorted, 50) - percentile(b_sorted, 50))
p90_boot.append(percentile(f_sorted, 90) - percentile(b_sorted, 90))
p99_boot.append(percentile(f_sorted, 99) - percentile(b_sorted, 99))
p50_boot.sort()
p90_boot.sort()
p99_boot.sort()
lo = int(BOOTSTRAP_ITERATIONS * 0.025)
hi = int(BOOTSTRAP_ITERATIONS * 0.975)
mean_mgas_diff = sum(all_mgas_diffs) / len(all_mgas_diffs) if all_mgas_diffs else 0.0
std_mgas_diff = stddev(all_mgas_diffs, mean_mgas_diff) if len(all_mgas_diffs) > 1 else 0.0
mgas_se = std_mgas_diff / math.sqrt(len(all_mgas_diffs)) if all_mgas_diffs else 0.0
mgas_ci = T_CRITICAL * mgas_se
return {
"n": n,
"mean_diff_ms": mean_diff,
"ci_ms": ci,
"p50_diff_ms": p50_diff,
"p50_ci_ms": (p50_boot[hi] - p50_boot[lo]) / 2,
"p90_diff_ms": p90_diff,
"p90_ci_ms": (p90_boot[hi] - p90_boot[lo]) / 2,
"p99_diff_ms": p99_diff,
"p99_ci_ms": (p99_boot[hi] - p99_boot[lo]) / 2,
"mean_mgas_diff": mean_mgas_diff,
"mgas_ci": mgas_ci,
"blocks": max(blocks_per_pair),
}
def format_duration(seconds: float) -> str:
if seconds >= 60:
return f"{seconds / 60:.1f}min"
return f"{seconds}s"
def format_gas(gas: int) -> str:
if gas >= GIGAGAS:
return f"{gas / GIGAGAS:.1f}G"
if gas >= 1_000_000:
return f"{gas / 1_000_000:.1f}M"
return f"{gas:,}"
def fmt_ms(v: float) -> str:
return f"{v:.2f}ms"
def fmt_mgas(v: float) -> str:
return f"{v:.2f}"
def significance(pct: float, ci_pct: float, lower_is_better: bool) -> str:
"""Return significance label: 'good', 'bad', or 'neutral'."""
significant = abs(pct) > ci_pct
if not significant:
return "neutral"
elif (pct < 0) == lower_is_better:
return "good"
else:
return "bad"
def change_str(pct: float, ci_pct: float, lower_is_better: bool) -> str:
"""Format change% with paired CI significance.
Significant if the CI doesn't cross zero (i.e. |pct| > ci_pct).
"""
sig = significance(pct, ci_pct, lower_is_better)
emoji = {"good": "", "bad": "", "neutral": ""}[sig]
return f"{pct:+.2f}% {emoji}{ci_pct:.2f}%)"
def compute_changes(
baseline_stats: dict, feature_stats: dict, paired_stats: dict
) -> dict:
"""Pre-compute change percentages and significance for each metric."""
def pct(base: float, feat: float) -> float:
return (feat - base) / base * 100.0 if base > 0 else 0.0
def ci_pct(ci_ms: float, base_ms: float) -> float:
return ci_ms / base_ms * 100.0 if base_ms > 0 else 0.0
metrics = [
("mean", "mean_ms", "ci_ms", "mean_ms", True),
("p50", "p50_ms", "p50_ci_ms", "p50_ms", True),
("p90", "p90_ms", "p90_ci_ms", "p90_ms", True),
("p99", "p99_ms", "p99_ci_ms", "p99_ms", True),
("mgas_s", "mean_mgas_s", "mgas_ci", "mean_mgas_s", False),
]
changes = {}
for name, stat_key, ci_key, base_key, lower_is_better in metrics:
p = pct(baseline_stats[stat_key], feature_stats[stat_key])
c = ci_pct(paired_stats[ci_key], baseline_stats[base_key])
changes[name] = {
"pct": round(p, 4),
"ci_pct": round(c, 4),
"sig": significance(p, c, lower_is_better),
}
return changes
def generate_comparison_table(
run1: dict,
run2: dict,
paired: dict,
repo: str,
baseline_ref: str,
baseline_name: str,
feature_name: str,
feature_sha: str,
) -> str:
"""Generate a markdown comparison table between baseline and feature."""
n = paired["blocks"]
def pct(base: float, feat: float) -> float:
return (feat - base) / base * 100.0 if base > 0 else 0.0
mean_pct = pct(run1["mean_ms"], run2["mean_ms"])
gas_pct = pct(run1["mean_mgas_s"], run2["mean_mgas_s"])
p50_pct = pct(run1["p50_ms"], run2["p50_ms"])
p90_pct = pct(run1["p90_ms"], run2["p90_ms"])
p99_pct = pct(run1["p99_ms"], run2["p99_ms"])
# Bootstrap CIs as % of baseline percentile
p50_ci_pct = paired["p50_ci_ms"] / run1["p50_ms"] * 100.0 if run1["p50_ms"] > 0 else 0.0
p90_ci_pct = paired["p90_ci_ms"] / run1["p90_ms"] * 100.0 if run1["p90_ms"] > 0 else 0.0
p99_ci_pct = paired["p99_ci_ms"] / run1["p99_ms"] * 100.0 if run1["p99_ms"] > 0 else 0.0
# CI as a percentage of baseline mean
lat_ci_pct = paired["ci_ms"] / run1["mean_ms"] * 100.0 if run1["mean_ms"] > 0 else 0.0
mgas_ci_pct = paired["mgas_ci"] / run1["mean_mgas_s"] * 100.0 if run1["mean_mgas_s"] > 0 else 0.0
base_url = f"https://github.com/{repo}/commit"
baseline_label = f"[`{baseline_name}`]({base_url}/{baseline_ref})"
feature_label = f"[`{feature_name}`]({base_url}/{feature_sha})"
lines = [
f"| Metric | {baseline_label} | {feature_label} | Change |",
"|--------|------|--------|--------|",
f"| Mean | {fmt_ms(run1['mean_ms'])} | {fmt_ms(run2['mean_ms'])} | {change_str(mean_pct, lat_ci_pct, lower_is_better=True)} |",
f"| StdDev | {fmt_ms(run1['stddev_ms'])} | {fmt_ms(run2['stddev_ms'])} | |",
f"| P50 | {fmt_ms(run1['p50_ms'])} | {fmt_ms(run2['p50_ms'])} | {change_str(p50_pct, p50_ci_pct, lower_is_better=True)} |",
f"| P90 | {fmt_ms(run1['p90_ms'])} | {fmt_ms(run2['p90_ms'])} | {change_str(p90_pct, p90_ci_pct, lower_is_better=True)} |",
f"| P99 | {fmt_ms(run1['p99_ms'])} | {fmt_ms(run2['p99_ms'])} | {change_str(p99_pct, p99_ci_pct, lower_is_better=True)} |",
f"| Mgas/s | {fmt_mgas(run1['mean_mgas_s'])} | {fmt_mgas(run2['mean_mgas_s'])} | {change_str(gas_pct, mgas_ci_pct, lower_is_better=False)} |",
"",
f"*{n} blocks*",
]
return "\n".join(lines)
def generate_wait_time_table(
title: str,
baseline_stats: dict,
feature_stats: dict,
baseline_label: str,
feature_label: str,
) -> str:
"""Generate a markdown table for a wait time metric."""
if not baseline_stats or not feature_stats:
return ""
lines = [
f"### {title}",
"",
f"| Metric | {baseline_label} | {feature_label} |",
"|--------|------|--------|",
f"| Mean | {fmt_ms(baseline_stats['mean_ms'])} | {fmt_ms(feature_stats['mean_ms'])} |",
f"| P50 | {fmt_ms(baseline_stats['p50_ms'])} | {fmt_ms(feature_stats['p50_ms'])} |",
f"| P95 | {fmt_ms(baseline_stats['p95_ms'])} | {fmt_ms(feature_stats['p95_ms'])} |",
]
return "\n".join(lines)
def generate_markdown(
summary: dict, comparison_table: str,
wait_time_tables: list[str] | None = None,
behind_baseline: int = 0, repo: str = "", baseline_ref: str = "", baseline_name: str = "",
) -> str:
"""Generate a markdown comment body."""
lines = ["## Benchmark Results", ""]
if behind_baseline > 0:
s = "s" if behind_baseline > 1 else ""
diff_link = f"https://github.com/{repo}/compare/{baseline_ref[:12]}...{baseline_name}"
lines.append(f"> ⚠️ Feature is [**{behind_baseline} commit{s} behind `{baseline_name}`**]({diff_link}). Consider rebasing for accurate results.")
lines.append("")
lines.append(comparison_table)
if wait_time_tables:
lines.append("")
lines.append("<details>")
lines.append("<summary>Wait Time Breakdown</summary>")
lines.append("")
for table in wait_time_tables:
if table:
lines.append(table)
lines.append("")
lines.append("</details>")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Parse reth-bench ABBA results")
parser.add_argument(
"--baseline-csv", nargs="+", required=True,
help="Baseline combined_latency.csv files (A1, A2)",
)
parser.add_argument(
"--feature-csv", "--branch-csv", nargs="+", required=True,
help="Feature combined_latency.csv files (B1, B2)",
)
parser.add_argument("--gas-csv", required=True, help="Path to total_gas.csv")
parser.add_argument(
"--output-summary", required=True, help="Output JSON summary path"
)
parser.add_argument("--output-markdown", required=True, help="Output markdown path")
parser.add_argument(
"--repo", default="paradigmxyz/reth", help="GitHub repo (owner/name)"
)
parser.add_argument("--baseline-ref", default=None, help="Baseline commit SHA")
parser.add_argument("--baseline-name", default=None, help="Baseline display name")
parser.add_argument("--feature-name", "--branch-name", default=None, help="Feature branch name")
parser.add_argument("--feature-ref", "--branch-sha", "--feature-sha", default=None, help="Feature commit SHA")
parser.add_argument("--behind-baseline", "--behind-main", type=int, default=0, help="Commits behind baseline")
args = parser.parse_args()
if len(args.baseline_csv) != len(args.feature_csv):
print("Must provide equal number of baseline and feature CSVs", file=sys.stderr)
sys.exit(1)
baseline_runs = []
feature_runs = []
for path in args.baseline_csv:
data = parse_combined_csv(path)
if not data:
print(f"No results in {path}", file=sys.stderr)
sys.exit(1)
baseline_runs.append(data)
for path in args.feature_csv:
data = parse_combined_csv(path)
if not data:
print(f"No results in {path}", file=sys.stderr)
sys.exit(1)
feature_runs.append(data)
gas = parse_gas_csv(args.gas_csv)
all_baseline = [r for run in baseline_runs for r in run]
all_feature = [r for run in feature_runs for r in run]
baseline_stats = compute_stats(all_baseline)
feature_stats = compute_stats(all_feature)
paired_stats = compute_paired_stats(baseline_runs, feature_runs)
if not paired_stats:
print("No common blocks between baseline and feature runs", file=sys.stderr)
sys.exit(1)
baseline_ref = args.baseline_ref or "main"
baseline_name = args.baseline_name or "baseline"
feature_name = args.feature_name or "feature"
feature_sha = args.feature_ref or "unknown"
comparison_table = generate_comparison_table(
baseline_stats,
feature_stats,
paired_stats,
repo=args.repo,
baseline_ref=baseline_ref,
baseline_name=baseline_name,
feature_name=feature_name,
feature_sha=feature_sha,
)
print(f"Generated comparison ({paired_stats['n']} paired blocks, "
f"mean diff {paired_stats['mean_diff_ms']:+.3f}ms ± {paired_stats['ci_ms']:.3f}ms)")
base_url = f"https://github.com/{args.repo}/commit"
baseline_label = f"[`{baseline_name}`]({base_url}/{baseline_ref})"
feature_label = f"[`{feature_name}`]({base_url}/{feature_sha})"
wait_fields = [
("persistence_wait_us", "Persistence Wait"),
("sparse_trie_wait_us", "Trie Cache Update Wait"),
("execution_cache_wait_us", "Execution Cache Update Wait"),
]
wait_time_tables = []
wait_time_data = {}
for field, title in wait_fields:
b_stats = compute_wait_stats(all_baseline, field)
f_stats = compute_wait_stats(all_feature, field)
if b_stats and f_stats:
wait_time_data[field] = {
"title": title,
"baseline": b_stats,
"feature": f_stats,
}
table = generate_wait_time_table(title, b_stats, f_stats, baseline_label, feature_label)
if table:
wait_time_tables.append(table)
summary = {
"blocks": paired_stats["blocks"],
"baseline": {
"name": baseline_name,
"ref": baseline_ref,
"stats": baseline_stats,
},
"feature": {
"name": feature_name,
"ref": feature_sha,
"stats": feature_stats,
},
"paired": paired_stats,
"changes": compute_changes(baseline_stats, feature_stats, paired_stats),
"wait_times": wait_time_data,
}
with open(args.output_summary, "w") as f:
json.dump(summary, f, indent=2)
print(f"Summary written to {args.output_summary}")
markdown = generate_markdown(
summary, comparison_table,
wait_time_tables=wait_time_tables,
behind_baseline=args.behind_baseline,
repo=args.repo,
baseline_ref=baseline_ref,
baseline_name=baseline_name,
)
with open(args.output_markdown, "w") as f:
f.write(markdown)
print(f"Markdown written to {args.output_markdown}")
if __name__ == "__main__":
main()

View File

@@ -1,339 +0,0 @@
// Sends Slack notifications for reth-bench results.
//
// Reads from environment:
// SLACK_BENCH_BOT_TOKEN Slack Bot User OAuth Token (xoxb-...)
// SLACK_BENCH_CHANNEL Public channel ID for significant improvements
// BENCH_WORK_DIR Directory containing summary.json
// BENCH_PR PR number (may be empty)
// BENCH_ACTOR GitHub user who triggered the bench
// BENCH_JOB_URL URL to the Actions job page
// BENCH_SAMPLY 'true' if samply profiling was enabled
//
// Usage from actions/github-script:
// const notify = require('./.github/scripts/bench-slack-notify.js');
// await notify.success({ core, context });
// await notify.failure({ core, context, failedStep: '...' });
const fs = require('fs');
const path = require('path');
const SLACK_API = 'https://slack.com/api/chat.postMessage';
function loadSlackUsers(repoRoot) {
try {
const raw = fs.readFileSync(path.join(repoRoot, '.github', 'scripts', 'bench-slack-users.json'), 'utf8');
const data = JSON.parse(raw);
// Filter out non-user-ID entries (like _comment)
const users = {};
for (const [k, v] of Object.entries(data)) {
if (!k.startsWith('_') && typeof v === 'string' && v.startsWith('U')) {
users[k] = v;
}
}
return users;
} catch {
return {};
}
}
async function postToSlack(token, channel, blocks, text, core, threadTs) {
const payload = { channel, blocks, text, unfurl_links: false };
if (threadTs) payload.thread_ts = threadTs;
const resp = await fetch(SLACK_API, {
method: 'POST',
headers: {
'Authorization': `Bearer ${token}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(payload),
});
const data = await resp.json();
if (!data.ok) {
core.warning(`Slack API error (channel ${channel}): ${JSON.stringify(data)}`);
}
return data;
}
function cell(text) {
const s = String(text);
return { type: 'raw_text', text: s || ' ' };
}
function buildSuccessBlocks({ summary, prNumber, actor, actorSlackId, jobUrl, repo, samplyUrls }) {
const b = summary.baseline.stats;
const f = summary.feature.stats;
const c = summary.changes;
const sigEmoji = { good: '\u2705', bad: '\u274c', neutral: '\u26aa' };
function fmtMs(v) { return v.toFixed(2) + 'ms'; }
function fmtMgas(v) { return v.toFixed(2); }
function fmtChange(ch) {
if (!ch.pct && !ch.ci_pct) return ' ';
const pctStr = `${ch.pct >= 0 ? '+' : ''}${ch.pct.toFixed(2)}%`;
const ciStr = ch.ci_pct ? ` (\u00b1${ch.ci_pct.toFixed(2)}%)` : '';
return `${pctStr}${ciStr} ${sigEmoji[ch.sig]}`;
}
// Overall result for header
const vals = Object.values(c);
const hasBad = vals.some(v => v.sig === 'bad');
const hasGood = vals.some(v => v.sig === 'good');
let headerEmoji, headerResult;
if (hasBad && hasGood) {
headerEmoji = ':warning:';
headerResult = 'Mixed Results';
} else if (hasBad) {
headerEmoji = ':x:';
headerResult = 'Regression';
} else if (hasGood) {
headerEmoji = ':white_check_mark:';
headerResult = 'Improvement';
} else {
headerEmoji = ':white_circle:';
headerResult = 'No Difference';
}
const prUrl = prNumber ? `https://github.com/${repo}/pull/${prNumber}` : '';
const commitUrl = `https://github.com/${repo}/commit`;
const baselineLink = `<${commitUrl}/${summary.baseline.ref}|${summary.baseline.name}>`;
const featureLink = `<${commitUrl}/${summary.feature.ref}|${summary.feature.name}>`;
// Meta line
const metaParts = [];
if (prNumber) metaParts.push(`*<${prUrl}|PR #${prNumber}>*`);
metaParts.push(`triggered by ${actorSlackId ? `<@${actorSlackId}>` : `@${actor}`}`);
// Baseline/feature lines with samply profile links
let baselineLine = `*Baseline:* ${baselineLink}`;
const bl1 = samplyUrls['baseline-1'];
const bl2 = samplyUrls['baseline-2'];
if (bl1) baselineLine += ` | <${bl1}|Samply 1>`;
if (bl2) baselineLine += ` | <${bl2}|Samply 2>`;
let featureLine = `*Feature:* ${featureLink}`;
const fl1 = samplyUrls['feature-1'];
const fl2 = samplyUrls['feature-2'];
if (fl1) featureLine += ` | <${fl1}|Samply 1>`;
if (fl2) featureLine += ` | <${fl2}|Samply 2>`;
const warmup = summary.warmup_blocks || process.env.BENCH_WARMUP_BLOCKS || '';
const countsLine = warmup
? `*Warmup:* ${warmup} | *Blocks:* ${summary.blocks}`
: `*Blocks:* ${summary.blocks}`;
const sectionText = [metaParts.join(' | '), '', baselineLine, featureLine, countsLine].join('\n');
// Action buttons
const diffUrl = `https://github.com/${repo}/compare/${summary.baseline.ref}...${summary.feature.ref}`;
const buttons = [
{
type: 'button',
text: { type: 'plain_text', text: 'CI :github:', emoji: true },
url: jobUrl,
action_id: 'ci_button',
},
{
type: 'button',
text: { type: 'plain_text', text: 'Diff :github:', emoji: true },
url: diffUrl,
action_id: 'diff_button',
},
];
const blocks = [
{
type: 'header',
text: { type: 'plain_text', text: `${headerEmoji} ${headerResult}`, emoji: true },
},
{
type: 'section',
text: { type: 'mrkdwn', text: sectionText },
},
{
type: 'table',
column_settings: [
{ align: 'left' },
{ align: 'right' },
{ align: 'right' },
{ align: 'right' },
],
rows: [
[cell('Metric'), cell('Baseline'), cell('Feature'), cell('Change')],
[cell('Mean'), cell(fmtMs(b.mean_ms)), cell(fmtMs(f.mean_ms)), cell(fmtChange(c.mean))],
[cell('StdDev'), cell(fmtMs(b.stddev_ms)), cell(fmtMs(f.stddev_ms)), cell(' ')],
[cell('P50'), cell(fmtMs(b.p50_ms)), cell(fmtMs(f.p50_ms)), cell(fmtChange(c.p50))],
[cell('P90'), cell(fmtMs(b.p90_ms)), cell(fmtMs(f.p90_ms)), cell(fmtChange(c.p90))],
[cell('P99'), cell(fmtMs(b.p99_ms)), cell(fmtMs(f.p99_ms)), cell(fmtChange(c.p99))],
[cell('Mgas/s'), cell(fmtMgas(b.mean_mgas_s)), cell(fmtMgas(f.mean_mgas_s)), cell(fmtChange(c.mgas_s))],
],
},
{
type: 'actions',
elements: buttons,
},
];
// Wait times as a separate table block (sent as threaded reply due to Slack one-table limit)
const threadBlocks = [];
const waitTimes = summary.wait_times || {};
const waitKeys = Object.keys(waitTimes);
if (waitKeys.length > 0) {
const waitRows = [
[cell('Wait Time'), cell('Baseline'), cell('Feature')],
];
for (const key of waitKeys) {
const wt = waitTimes[key];
waitRows.push([cell(wt.title), cell(fmtMs(wt.baseline.mean_ms)), cell(fmtMs(wt.feature.mean_ms))]);
}
threadBlocks.push({
type: 'table',
column_settings: [
{ align: 'left' },
{ align: 'right' },
{ align: 'right' },
],
rows: waitRows,
});
}
return { blocks, threadBlocks };
}
function buildFailureBlocks({ prNumber, actor, actorSlackId, jobUrl, repo, failedStep }) {
const prUrl = prNumber ? `https://github.com/${repo}/pull/${prNumber}` : '';
const actorMention = actorSlackId ? `<@${actorSlackId}>` : `@${actor}`;
const parts = [
prNumber ? `*<${prUrl}|PR #${prNumber}>*` : '',
`by ${actorMention}`,
`failed while *${failedStep}*`,
].filter(Boolean);
const buttons = [
{
type: 'button',
text: { type: 'plain_text', text: 'CI :github:', emoji: true },
url: jobUrl,
action_id: 'ci_button',
},
];
return [
{
type: 'header',
text: { type: 'plain_text', text: ':rotating_light: Bench Failed', emoji: true },
},
{
type: 'section',
text: { type: 'mrkdwn', text: parts.join(' | ') },
},
{
type: 'actions',
elements: buttons,
},
];
}
async function success({ core, context }) {
const token = process.env.SLACK_BENCH_BOT_TOKEN;
if (!token) {
core.info('SLACK_BENCH_BOT_TOKEN not set, skipping Slack notification');
return;
}
let summary;
try {
summary = JSON.parse(fs.readFileSync(process.env.BENCH_WORK_DIR + '/summary.json', 'utf8'));
} catch (e) {
core.warning('Could not read summary.json for Slack notification');
return;
}
const repo = `${context.repo.owner}/${context.repo.repo}`;
const prNumber = process.env.BENCH_PR;
const actor = process.env.BENCH_ACTOR;
const jobUrl = process.env.BENCH_JOB_URL ||
`${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/actions/runs/${context.runId}`;
// Load samply profile URLs (files exist when samply profiling was enabled)
const samplyUrls = {};
for (const run of ['baseline-1', 'baseline-2', 'feature-1', 'feature-2']) {
try {
const url = fs.readFileSync(
path.join(process.env.BENCH_WORK_DIR, run, 'samply-profile-url.txt'), 'utf8'
).trim();
if (url) samplyUrls[run] = url;
} catch {}
}
const slackUsers = loadSlackUsers(process.env.GITHUB_WORKSPACE || '.');
const actorSlackId = slackUsers[actor];
const { blocks, threadBlocks } = buildSuccessBlocks({ summary, prNumber, actor, actorSlackId, jobUrl, repo, samplyUrls });
const text = `Bench: ${summary.baseline.name} vs ${summary.feature.name}`;
async function sendWithThread(ch) {
const res = await postToSlack(token, ch, blocks, text, core);
if (res.ok && res.ts && threadBlocks.length > 0) {
for (const tb of threadBlocks) {
await postToSlack(token, ch, [tb], 'Wait time breakdown', core, res.ts);
}
}
}
// Post to public channel if any metric shows significant improvement or regression
const channel = process.env.SLACK_BENCH_CHANNEL;
let postedToChannel = false;
if (channel) {
const changes = summary.changes || {};
const hasImprovement = Object.values(changes).some(c => c.sig === 'good');
if (hasImprovement) {
await sendWithThread(channel);
postedToChannel = true;
} else {
core.info('No significant improvement, skipping public channel notification');
}
}
// DM the actor only when results were not posted to the public channel
if (!postedToChannel) {
if (actorSlackId) {
await sendWithThread(actorSlackId);
} else {
core.info(`No Slack user mapping for GitHub user '${actor}', skipping DM`);
}
} else {
core.info(`Results posted to channel, skipping DM to ${actor}`);
}
}
async function failure({ core, context, failedStep }) {
const token = process.env.SLACK_BENCH_BOT_TOKEN;
if (!token) {
core.info('SLACK_BENCH_BOT_TOKEN not set, skipping Slack notification');
return;
}
const repo = `${context.repo.owner}/${context.repo.repo}`;
const prNumber = process.env.BENCH_PR;
const actor = process.env.BENCH_ACTOR;
const jobUrl = process.env.BENCH_JOB_URL ||
`${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/actions/runs/${context.runId}`;
const slackUsers = loadSlackUsers(process.env.GITHUB_WORKSPACE || '.');
const actorSlackId = slackUsers[actor];
const blocks = buildFailureBlocks({ prNumber, actor, actorSlackId, jobUrl, repo, failedStep });
const text = `Bench failed while ${failedStep}`;
// Always DM the actor
if (actorSlackId) {
await postToSlack(token, actorSlackId, blocks, text, core);
} else {
core.info(`No Slack user mapping for GitHub user '${actor}', skipping DM`);
}
// Only DM for failures, don't post to public channel
}
module.exports = { success, failure };

View File

@@ -1,13 +0,0 @@
{
"_comment": "Maps GitHub usernames to Slack user IDs. Find yours: Slack profile > ··· > Copy member ID.",
"shekhirin": "U09FAL2UMLJ",
"mattsse": "U09FQNPMRT3",
"klkvr": "U09FAK95FC2",
"joshieDo": "U09LHN6GYAU",
"mediocregopher": "U09FF75KMQU",
"yongkangc": "U09FB0ECTD4",
"gakonst": "U092SEPDM40",
"Rjected": "U09F6SCKRGT",
"DaniPopes": "U09FAT8EK2A",
"emmajam": "U0A34UN92HW"
}

View File

@@ -1,27 +0,0 @@
// Updates the reth-bench PR comment with current status.
//
// Reads from environment:
// BENCH_COMMENT_ID GitHub comment ID to update
// BENCH_JOB_URL URL to the Actions job page
// BENCH_CONFIG Config line (blocks, warmup, refs)
// BENCH_ACTOR User who triggered the benchmark
//
// Usage from actions/github-script:
// const s = require('./.github/scripts/bench-update-status.js');
// await s({github, context, status: 'Building baseline binary...'});
function buildBody(status) {
return `cc @${process.env.BENCH_ACTOR}\n\n🚀 Benchmark started! [View job](${process.env.BENCH_JOB_URL})\n\n⏳ **Status:** ${status}\n\n${process.env.BENCH_CONFIG}`;
}
async function updateStatus({ github, context, status }) {
await github.rest.issues.updateComment({
owner: context.repo.owner,
repo: context.repo.repo,
comment_id: parseInt(process.env.BENCH_COMMENT_ID),
body: buildBody(status),
});
}
updateStatus.buildBody = buildBody;
module.exports = updateStatus;

View File

@@ -59,6 +59,10 @@ engine-auth: [ ]
#
# System contract tests (already fixed and deployed):
#
# tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout and test_invalid_log_length
# System contract is already fixed and deployed; tests cover scenarios where contract is
# malformed which can't happen retroactively. No point in adding checks.
#
# tests/prague/eip7002_el_triggerable_withdrawals/test_contract_deployment.py::test_system_contract_deployment
# tests/prague/eip7251_consolidations/test_contract_deployment.py::test_system_contract_deployment
# Post-fork system contract deployment tests. Should fix for spec compliance but not realistic
@@ -67,8 +71,32 @@ eels/consume-engine:
- tests/prague/eip7702_set_code_tx/test_set_code_txs.py::test_set_code_to_non_empty_storage[fork_Prague-blockchain_test_engine-zero_nonce]-reth
- tests/prague/eip7251_consolidations/test_contract_deployment.py::test_system_contract_deployment[fork_CancunToPragueAtTime15k-blockchain_test_engine-deploy_after_fork-nonzero_balance]-reth
- tests/prague/eip7251_consolidations/test_contract_deployment.py::test_system_contract_deployment[fork_CancunToPragueAtTime15k-blockchain_test_engine-deploy_after_fork-zero_balance]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Prague-blockchain_test_engine-log_argument_amount_offset-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Prague-blockchain_test_engine-log_argument_amount_size-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Prague-blockchain_test_engine-log_argument_index_offset-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Prague-blockchain_test_engine-log_argument_index_size-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Prague-blockchain_test_engine-log_argument_pubkey_offset-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Prague-blockchain_test_engine-log_argument_pubkey_size-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Prague-blockchain_test_engine-log_argument_signature_offset-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Prague-blockchain_test_engine-log_argument_signature_size-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Prague-blockchain_test_engine-log_argument_withdrawal_credentials_offset-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Prague-blockchain_test_engine-log_argument_withdrawal_credentials_size-value_zero]-reth
- tests/prague/eip7002_el_triggerable_withdrawals/test_contract_deployment.py::test_system_contract_deployment[fork_CancunToPragueAtTime15k-blockchain_test_engine-deploy_after_fork-nonzero_balance]-reth
- tests/prague/eip7002_el_triggerable_withdrawals/test_contract_deployment.py::test_system_contract_deployment[fork_CancunToPragueAtTime15k-blockchain_test_engine-deploy_after_fork-zero_balance]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_log_length[fork_Prague-blockchain_test_engine-slice_bytes_False]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_log_length[fork_Prague-blockchain_test_engine-slice_bytes_True]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Osaka-blockchain_test_engine-log_argument_amount_offset-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Osaka-blockchain_test_engine-log_argument_amount_size-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Osaka-blockchain_test_engine-log_argument_index_offset-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Osaka-blockchain_test_engine-log_argument_index_size-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Osaka-blockchain_test_engine-log_argument_pubkey_offset-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Osaka-blockchain_test_engine-log_argument_pubkey_size-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Osaka-blockchain_test_engine-log_argument_signature_offset-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Osaka-blockchain_test_engine-log_argument_signature_size-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Osaka-blockchain_test_engine-log_argument_withdrawal_credentials_offset-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_layout[fork_Osaka-blockchain_test_engine-log_argument_withdrawal_credentials_size-value_zero]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_log_length[fork_Osaka-blockchain_test_engine-slice_bytes_False]-reth
- tests/prague/eip6110_deposits/test_modified_contract.py::test_invalid_log_length[fork_Osaka-blockchain_test_engine-slice_bytes_True]-reth
- tests/paris/eip7610_create_collision/test_initcollision.py::test_init_collision_create_tx[fork_Osaka-tx_type_0-blockchain_test_engine_from_state_test-non-empty-balance-revert-initcode]-reth
- tests/paris/eip7610_create_collision/test_initcollision.py::test_init_collision_create_tx[fork_Prague-tx_type_0-blockchain_test_engine_from_state_test-non-empty-balance-correct-initcode]-reth
- tests/paris/eip7610_create_collision/test_initcollision.py::test_init_collision_create_tx[fork_Paris-tx_type_1-blockchain_test_engine_from_state_test-non-empty-balance-correct-initcode]-reth

View File

@@ -16,22 +16,12 @@ engine-withdrawals:
- Withdrawals Fork on Block 1 - 8 Block Re-Org NewPayload (Paris) (reth)
- Withdrawals Fork on Block 8 - 10 Block Re-Org NewPayload (Paris) (reth)
- Withdrawals Fork on Canonical Block 8 / Side Block 7 - 10 Block Re-Org (Paris) (reth)
# P2P sync timing issue in hive Docker environment: secondary client returns SYNCING but
# peer discovery/connection doesn't complete within the timeout when running with
# --sim.parallelism 16. Not a correctness bug, purely a CI timing issue.
- Sync after 2 blocks - Withdrawals on Block 2 - Multiple Withdrawal Accounts (Paris) (reth)
- Sync after 2 blocks - Withdrawals on Block 2 - Multiple Withdrawal Accounts - No Transactions (Paris) (reth)
- Sync after 128 blocks - Withdrawals on Block 2 - Multiple Withdrawal Accounts (Paris) (reth)
engine-cancun:
- Transaction Re-Org, New Payload on Revert Back (Cancun) (reth)
- Transaction Re-Org, Re-Org to Different Block (Cancun) (reth)
- Transaction Re-Org, Re-Org Out (Cancun) (reth)
- Invalid Missing Ancestor ReOrg, StateRoot, EmptyTxs=False, Invalid P9 (Cancun) (reth)
# Hive test infra bug: geth sidecar switched to PathScheme for state storage, which has
# strict trie integrity requirements incompatible with inserting intentionally invalid blocks.
# Affects all clients, not just reth. Tracked: https://github.com/ethereum/hive/issues/1382
- Invalid Missing Ancestor Syncing ReOrg, Timestamp, EmptyTxs=False, CanonicalReOrg=False, Invalid P8 (Cancun) (reth)
- Invalid Missing Ancestor Syncing ReOrg, Timestamp, EmptyTxs=False, CanonicalReOrg=True, Invalid P8 (Cancun) (reth)
- Multiple New Payloads Extending Canonical Chain, Wait for Canonical Payload (Cancun) (reth)
engine-api:
- Transaction Re-Org, Re-Org Out (Paris) (reth)

View File

@@ -6,14 +6,8 @@ cd hivetests/
sim="${1}"
limit="${2}"
# Use lower parallelism for eels tests to avoid OOM-killing the runner
parallelism=16
if [[ "${sim}" == *"eels"* ]]; then
parallelism=4
fi
run_hive() {
hive --sim "${sim}" --sim.limit "${limit}" --sim.parallelism "${parallelism}" --client reth 2>&1 | tee /tmp/log || true
hive --sim "${sim}" --sim.limit "${limit}" --sim.parallelism 16 --client reth 2>&1 | tee /tmp/log || true
}
check_log() {

File diff suppressed because it is too large Load Diff

View File

@@ -15,7 +15,7 @@ env:
jobs:
build:
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-8' || 'ubuntu-latest' }}
runs-on: depot-ubuntu-latest-8
timeout-minutes: 90
steps:
- name: Checkout

View File

@@ -25,7 +25,7 @@ env:
jobs:
check:
name: Check compilation with patched alloy
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-16' || 'ubuntu-latest' }}
runs-on: depot-ubuntu-latest-16
timeout-minutes: 60
steps:
- uses: actions/checkout@v4

View File

@@ -18,7 +18,7 @@ env:
name: compact-codec
jobs:
compact-codec:
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
runs-on: depot-ubuntu-latest
strategy:
matrix:
bin:

View File

@@ -15,6 +15,7 @@ on:
jobs:
build:
if: github.repository == 'paradigmxyz/reth'
timeout-minutes: 45
runs-on: ubuntu-latest
permissions:
@@ -30,22 +31,10 @@ jobs:
echo "sha=${{ github.sha }}" >> "$GITHUB_OUTPUT"
echo "describe=$(git describe --always --tags)" >> "$GITHUB_OUTPUT"
- name: Detect fork
id: fork
run: |
if [ "${{ github.event_name }}" = "pull_request" ] && [ "${{ github.event.pull_request.head.repo.full_name }}" != "${{ github.repository }}" ]; then
echo "is_fork=true" >> "$GITHUB_OUTPUT"
else
echo "is_fork=false" >> "$GITHUB_OUTPUT"
fi
# Depot build (upstream only)
- name: Set up Depot CLI
if: steps.fork.outputs.is_fork == 'false'
uses: depot/setup-action@v1
- name: Build reth image (Depot)
if: steps.fork.outputs.is_fork == 'false'
- name: Build reth image
uses: depot/bake-action@v1
env:
DEPOT_TOKEN: ${{ secrets.DEPOT_TOKEN }}
@@ -57,24 +46,6 @@ jobs:
targets: ${{ inputs.hive_target }}
push: false
# Docker build (forks)
- name: Set up Docker Buildx
if: steps.fork.outputs.is_fork == 'true'
uses: docker/setup-buildx-action@v3
- name: Build reth image (Docker)
if: steps.fork.outputs.is_fork == 'true'
uses: docker/bake-action@v6
env:
VERGEN_GIT_SHA: ${{ steps.git.outputs.sha }}
VERGEN_GIT_DESCRIBE: ${{ steps.git.outputs.describe }}
with:
files: docker-bake.hcl
targets: ${{ inputs.hive_target }}
push: false
set: |
*.dockerfile=Dockerfile
- name: Upload reth image
uses: actions/upload-artifact@v6
with:

View File

@@ -20,7 +20,7 @@ concurrency:
jobs:
test:
name: e2e-testsuite
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
runs-on: depot-ubuntu-latest-4
env:
RUST_BACKTRACE: 1
timeout-minutes: 90
@@ -47,7 +47,7 @@ jobs:
rocksdb:
name: e2e-rocksdb
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
runs-on: depot-ubuntu-latest-4
env:
RUST_BACKTRACE: 1
timeout-minutes: 60

View File

@@ -32,7 +32,7 @@ jobs:
prepare-hive:
if: github.repository == 'paradigmxyz/reth'
timeout-minutes: 45
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
runs-on: depot-ubuntu-latest-4
steps:
- uses: actions/checkout@v6
- name: Checkout hive tests
@@ -188,8 +188,7 @@ jobs:
- build-reth-edge
- prepare-hive
name: ${{ matrix.storage }} / ${{ matrix.scenario.sim }}${{ matrix.scenario.limit && format(' - {0}', matrix.scenario.limit) }}
# Use larger runners for eels tests to avoid OOM runner crashes
runs-on: ${{ github.repository == 'paradigmxyz/reth' && (contains(matrix.scenario.sim, 'eels') && 'depot-ubuntu-latest-8' || 'depot-ubuntu-latest-4') || 'ubuntu-latest' }}
runs-on: depot-ubuntu-latest-4
permissions:
issues: write
steps:

View File

@@ -24,7 +24,7 @@ jobs:
test:
name: test / ${{ matrix.network }} / ${{ matrix.storage }}
if: github.event_name != 'schedule'
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
runs-on: depot-ubuntu-latest-4
env:
RUST_BACKTRACE: 1
strategy:

View File

@@ -31,7 +31,7 @@ jobs:
strategy:
fail-fast: false
name: run kurtosis
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
runs-on: depot-ubuntu-latest
needs:
- build-reth
steps:

View File

@@ -13,7 +13,7 @@ env:
jobs:
clippy-binaries:
name: clippy binaries / ${{ matrix.type }}
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
runs-on: depot-ubuntu-latest
timeout-minutes: 30
strategy:
matrix:
@@ -42,7 +42,7 @@ jobs:
clippy:
name: clippy
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
runs-on: depot-ubuntu-latest
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
@@ -59,7 +59,7 @@ jobs:
RUSTFLAGS: -D warnings
wasm:
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
runs-on: depot-ubuntu-latest
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
@@ -79,7 +79,7 @@ jobs:
.github/scripts/check_wasm.sh
riscv:
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
runs-on: depot-ubuntu-latest
timeout-minutes: 60
steps:
- uses: actions/checkout@v6
@@ -98,7 +98,7 @@ jobs:
crate-checks:
name: crate-checks (${{ matrix.partition }}/${{ matrix.total_partitions }})
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
runs-on: depot-ubuntu-latest-4
strategy:
matrix:
partition: [1, 2, 3]
@@ -117,7 +117,7 @@ jobs:
msrv:
name: MSRV
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
runs-on: depot-ubuntu-latest
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
@@ -135,7 +135,7 @@ jobs:
docs:
name: docs
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
runs-on: depot-ubuntu-latest-4
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
@@ -153,7 +153,7 @@ jobs:
fmt:
name: fmt
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
runs-on: depot-ubuntu-latest
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
@@ -167,7 +167,7 @@ jobs:
udeps:
name: udeps
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
runs-on: depot-ubuntu-latest
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
@@ -182,7 +182,7 @@ jobs:
book:
name: book
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
runs-on: depot-ubuntu-latest
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
@@ -240,7 +240,7 @@ jobs:
# Checks that selected crates can compile with power set of features
features:
name: features
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
runs-on: depot-ubuntu-latest
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
@@ -264,7 +264,7 @@ jobs:
# Check crates correctly propagate features
feature-propagation:
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
runs-on: depot-ubuntu-latest
timeout-minutes: 20
steps:
- uses: actions/checkout@v6

View File

@@ -74,7 +74,7 @@ jobs:
profile: maxperf
allow_fail: false
- target: aarch64-unknown-linux-gnu
os: ubuntu-24.04-arm
os: ubuntu-24.04
profile: maxperf
allow_fail: false
- target: x86_64-apple-darwin
@@ -85,6 +85,10 @@ jobs:
os: macos-14
profile: maxperf
allow_fail: false
- target: riscv64gc-unknown-linux-gnu
os: ubuntu-24.04
profile: maxperf
allow_fail: true
build:
- command: build
binary: reth
@@ -98,7 +102,7 @@ jobs:
- name: Install cross main
id: cross_main
run: |
cargo install cross --locked --git https://github.com/cross-rs/cross
cargo install cross --git https://github.com/cross-rs/cross
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true

View File

@@ -23,7 +23,7 @@ jobs:
name: stage-run-test
# Only run stage commands test in merge groups
if: github.event_name == 'merge_group'
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
runs-on: depot-ubuntu-latest
env:
RUST_LOG: info,sync=error
RUST_BACKTRACE: 1
@@ -38,7 +38,7 @@ jobs:
cache-on-failure: true
- name: Build reth
run: |
cargo install --locked --path bin/reth
cargo install --path bin/reth
- name: Run headers stage
run: |
reth stage run headers --from ${{ env.FROM_BLOCK }} --to ${{ env.TO_BLOCK }} --commit --checkpoints

View File

@@ -19,7 +19,7 @@ jobs:
sync:
if: github.repository == 'paradigmxyz/reth'
name: sync (${{ matrix.chain.bin }})
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
runs-on: depot-ubuntu-latest
env:
RUST_LOG: info,sync=error
RUST_BACKTRACE: 1

View File

@@ -19,7 +19,7 @@ jobs:
sync:
if: github.repository == 'paradigmxyz/reth'
name: sync (${{ matrix.chain.bin }})
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
runs-on: depot-ubuntu-latest
env:
RUST_LOG: info,sync=error
RUST_BACKTRACE: 1

View File

@@ -20,7 +20,7 @@ concurrency:
jobs:
test:
name: test / ${{ matrix.type }} / ${{ matrix.storage }}
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
runs-on: depot-ubuntu-latest-4
env:
RUST_BACKTRACE: 1
EDGE_FEATURES: ${{ matrix.storage == 'edge' && 'edge' || '' }}
@@ -57,7 +57,7 @@ jobs:
state:
name: Ethereum state tests
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
runs-on: depot-ubuntu-latest-4
env:
RUST_LOG: info,sync=error
RUST_BACKTRACE: 1
@@ -92,7 +92,7 @@ jobs:
doc:
name: doc tests
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest' || 'ubuntu-latest' }}
runs-on: depot-ubuntu-latest
env:
RUST_BACKTRACE: 1
timeout-minutes: 30

View File

@@ -172,22 +172,7 @@ Before submitting changes, ensure:
2. **Clippy**: No warnings
3. **Tests Pass**: All unit and integration tests
4. **Documentation**: Update relevant docs and add doc comments with `cargo docs --document-private-items`
5. **CLI Docs** (if CLI changed): Run `make update-book-cli` (see below)
6. **Commit Messages**: Follow conventional format (feat:, fix:, chore:, etc.)
### CLI Reference Docs (`book` CI Job)
The CLI reference pages under `docs/vocs/docs/pages/cli/` are **auto-generated** from the `reth` binary's `--help` output. **Do not edit these files manually** — any hand edits will be overwritten and CI will fail regardless.
When you add, remove, or modify CLI commands, subcommands, or flags, regenerate the CLI docs by running:
```bash
make update-book-cli
```
This builds `reth` in debug mode and runs `docs/cli/update.sh` to regenerate all CLI pages. Commit the resulting changes.
The `book` CI job (`.github/workflows/lint.yml`) enforces this by regenerating the docs and running `git diff --exit-code`. If the committed docs don't match the generated output, CI fails. Manually editing these pages is never productive — always use `make update-book-cli`.
5. **Commit Messages**: Follow conventional format (feat:, fix:, chore:, etc.)
### Opening PRs against <https://github.com/paradigmxyz/reth>
@@ -328,74 +313,6 @@ GLOBAL_COUNTER.fetch_add(1, Ordering::SeqCst);
Before adding a comment, ask: Would someone reading just the current code (no PR, no history) find this helpful?
#### Rust Style Guides
##### Type Ordering in Files
When defining structs, traits, and functions in a file, follow this ordering convention. The file's primary type (matching the file name) comes first, followed by supporting public types, then private types and helpers.
```rust
use ...;
/// The primary type of this file (matches filename).
pub struct PayloadProcessor { ... }
impl PayloadProcessor { ... }
// Followed by public auxiliary types that support the primary type
/// Configuration for the processor.
pub struct PayloadProcessorConfig { ... }
/// Result type returned by processor operations.
pub struct ProcessorResult { ... }
// Followed by public traits related to the primary type
pub trait ProcessorExt { ... }
// Followed by private helper types
struct InternalState { ... }
// Followed by private helper functions
fn validate_input() { ... }
```
❌ **Bad**: Adding new traits and auxiliary types **above** the file's primary type (see [#22133](https://github.com/paradigmxyz/reth/pull/22133)):
```rust
use ...;
// ❌ BAD - new auxiliary struct added before the file's main type
pub struct CacheWaitDurations { ... }
// ❌ BAD - new trait added before the file's main type
pub trait WaitForCaches { ... }
// The file's primary type is buried below unrelated additions
pub struct PayloadProcessor { ... }
```
✅ **Good**: New types go **after** the primary type:
```rust
use ...;
// ✅ The file's primary type stays at the top
pub struct PayloadProcessor { ... }
impl PayloadProcessor { ... }
// ✅ Auxiliary types follow the primary type
pub struct CacheWaitDurations { ... }
pub trait WaitForCaches { ... }
impl WaitForCaches for PayloadProcessor { ... }
```
### Example Contribution Workflow
Let's say you want to fix a bug where external IP resolution fails on startup:
@@ -470,8 +387,5 @@ cargo build --release
cargo check --workspace --all-features
# Check documentation
cargo docs --document-private-items
# Regenerate CLI reference docs (after CLI changes)
make update-book-cli
cargo docs --document-private-items
```

872
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,5 @@
[workspace.package]
version = "1.11.1"
version = "1.11.0"
edition = "2024"
rust-version = "1.93"
license = "MIT OR Apache-2.0"
@@ -138,7 +138,6 @@ members = [
"examples/exex-subscription",
"examples/exex-test",
"examples/full-contract-state",
"examples/migrate-trie-to-packed",
"examples/manual-p2p/",
"examples/network-txpool/",
"examples/network/",
@@ -398,7 +397,7 @@ reth-payload-builder-primitives = { path = "crates/payload/builder-primitives" }
reth-payload-primitives = { path = "crates/payload/primitives" }
reth-payload-validator = { path = "crates/payload/validator" }
reth-payload-util = { path = "crates/payload/util" }
reth-primitives = { path = "crates/primitives", default-features = false, features = ["__internal"] }
reth-primitives = { path = "crates/primitives", default-features = false }
reth-primitives-traits = { path = "crates/primitives-traits", default-features = false }
reth-provider = { path = "crates/storage/provider" }
reth-prune = { path = "crates/prune/prune" }
@@ -460,33 +459,33 @@ alloy-trie = { version = "0.9.4", default-features = false }
alloy-hardforks = "0.4.5"
alloy-consensus = { version = "1.7.3", default-features = false }
alloy-contract = { version = "1.7.3", default-features = false }
alloy-eips = { version = "1.7.3", default-features = false }
alloy-genesis = { version = "1.7.3", default-features = false }
alloy-json-rpc = { version = "1.7.3", default-features = false }
alloy-network = { version = "1.7.3", default-features = false }
alloy-network-primitives = { version = "1.7.3", default-features = false }
alloy-provider = { version = "1.7.3", features = ["reqwest", "debug-api"], default-features = false }
alloy-pubsub = { version = "1.7.3", default-features = false }
alloy-rpc-client = { version = "1.7.3", default-features = false }
alloy-rpc-types = { version = "1.7.3", features = ["eth"], default-features = false }
alloy-rpc-types-admin = { version = "1.7.3", default-features = false }
alloy-rpc-types-anvil = { version = "1.7.3", default-features = false }
alloy-rpc-types-beacon = { version = "1.7.3", default-features = false }
alloy-rpc-types-debug = { version = "1.7.3", default-features = false }
alloy-rpc-types-engine = { version = "1.7.3", default-features = false }
alloy-rpc-types-eth = { version = "1.7.3", default-features = false }
alloy-rpc-types-mev = { version = "1.7.3", default-features = false }
alloy-rpc-types-trace = { version = "1.7.3", default-features = false }
alloy-rpc-types-txpool = { version = "1.7.3", default-features = false }
alloy-serde = { version = "1.7.3", default-features = false }
alloy-signer = { version = "1.7.3", default-features = false }
alloy-signer-local = { version = "1.7.3", default-features = false }
alloy-transport = { version = "1.7.3" }
alloy-transport-http = { version = "1.7.3", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-ipc = { version = "1.7.3", default-features = false }
alloy-transport-ws = { version = "1.7.3", default-features = false }
alloy-consensus = { version = "1.6.3", default-features = false }
alloy-contract = { version = "1.6.3", default-features = false }
alloy-eips = { version = "1.6.3", default-features = false }
alloy-genesis = { version = "1.6.3", default-features = false }
alloy-json-rpc = { version = "1.6.3", default-features = false }
alloy-network = { version = "1.6.3", default-features = false }
alloy-network-primitives = { version = "1.6.3", default-features = false }
alloy-provider = { version = "1.6.3", features = ["reqwest", "debug-api"], default-features = false }
alloy-pubsub = { version = "1.6.3", default-features = false }
alloy-rpc-client = { version = "1.6.3", default-features = false }
alloy-rpc-types = { version = "1.6.3", features = ["eth"], default-features = false }
alloy-rpc-types-admin = { version = "1.6.3", default-features = false }
alloy-rpc-types-anvil = { version = "1.6.3", default-features = false }
alloy-rpc-types-beacon = { version = "1.6.3", default-features = false }
alloy-rpc-types-debug = { version = "1.6.3", default-features = false }
alloy-rpc-types-engine = { version = "1.6.3", default-features = false }
alloy-rpc-types-eth = { version = "1.6.3", default-features = false }
alloy-rpc-types-mev = { version = "1.6.3", default-features = false }
alloy-rpc-types-trace = { version = "1.6.3", default-features = false }
alloy-rpc-types-txpool = { version = "1.6.3", default-features = false }
alloy-serde = { version = "1.6.3", default-features = false }
alloy-signer = { version = "1.6.3", default-features = false }
alloy-signer-local = { version = "1.6.3", default-features = false }
alloy-transport = { version = "1.6.3" }
alloy-transport-http = { version = "1.6.3", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-ipc = { version = "1.6.3", default-features = false }
alloy-transport-ws = { version = "1.6.3", default-features = false }
# op
alloy-op-evm = { version = "0.27.2", default-features = false }
@@ -529,17 +528,16 @@ notify = { version = "8.0.0", default-features = false, features = ["macos_fseve
nybbles = { version = "0.4.8", default-features = false }
once_cell = { version = "1.19", default-features = false, features = ["critical-section"] }
parking_lot = "0.12"
quanta = "0.12"
paste = "1.0"
rand = "0.9"
rayon = "1.7"
thread-priority = "3.0.0"
rustc-hash = { version = "2.0", default-features = false }
schnellru = "0.2"
serde = { version = "1.0", default-features = false }
serde_json = { version = "1.0", default-features = false, features = ["alloc"] }
serde_with = { version = "3", default-features = false, features = ["macros"] }
sha2 = { version = "0.10", default-features = false }
shellexpand = "3.0.0"
shlex = "1.3"
smallvec = "1"
strum = { version = "0.27", default-features = false }
@@ -652,7 +650,7 @@ ethereum_ssz_derive = "0.10.1"
jemalloc_pprof = { version = "0.8", default-features = false }
tikv-jemalloc-ctl = "0.6"
tikv-jemallocator = "0.6"
tracy-client = { version = "0.18.0", features = ["demangle"] }
tracy-client = "0.18.0"
snmalloc-rs = { version = "0.3.7", features = ["build_cc"] }
aes = "0.8.1"

View File

@@ -1,6 +1,6 @@
# syntax=docker.io/docker/dockerfile:1.7-labs
FROM lukemathwalker/cargo-chef:latest-rust-1.93 AS chef
FROM lukemathwalker/cargo-chef:latest-rust-1 AS chef
WORKDIR /app
LABEL org.opencontainers.image.source=https://github.com/paradigmxyz/reth

View File

@@ -4,7 +4,7 @@
# Usage:
# reth: --build-arg BINARY=reth
FROM rust:1.93 AS builder
FROM rust:1 AS builder
WORKDIR /app
LABEL org.opencontainers.image.source=https://github.com/paradigmxyz/reth

View File

@@ -80,7 +80,7 @@ build-native-%:
#
# These commands require that:
#
# - `cross` is installed (`cargo install --locked cross`).
# - `cross` is installed (`cargo install cross`).
# - Docker is running.
# - The current user is in the `docker` group.
#
@@ -261,7 +261,7 @@ lint-typos: ensure-typos
ensure-typos:
@if ! command -v typos &> /dev/null; then \
echo "typos not found. Please install it by running the command 'cargo install --locked typos-cli' or refer to the following link for more information: https://github.com/crate-ci/typos"; \
echo "typos not found. Please install it by running the command 'cargo install typos-cli' or refer to the following link for more information: https://github.com/crate-ci/typos"; \
exit 1; \
fi

View File

@@ -45,6 +45,9 @@ serde_json.workspace = true
# Time handling
chrono = { workspace = true, features = ["serde"] }
# Path manipulation
shellexpand.workspace = true
# CSV handling
csv.workspace = true

View File

@@ -289,7 +289,11 @@ impl Args {
/// Get the JWT secret path - either provided or derived from datadir
pub(crate) fn jwt_secret_path(&self) -> PathBuf {
match &self.jwt_secret {
Some(path) => path.clone(),
Some(path) => {
let jwt_secret_str = path.to_string_lossy();
let expanded = shellexpand::tilde(&jwt_secret_str);
PathBuf::from(expanded.as_ref())
}
None => {
// Use the same logic as reth: <datadir>/<chain>/jwt.hex
let chain_path = self.datadir.clone().resolve_datadir(self.chain);
@@ -304,9 +308,10 @@ impl Args {
chain_path.data_dir().to_path_buf()
}
/// Get the output directory path
/// Get the expanded output directory path
pub(crate) fn output_dir_path(&self) -> PathBuf {
PathBuf::from(&self.output_dir)
let expanded = shellexpand::tilde(&self.output_dir);
PathBuf::from(expanded.as_ref())
}
/// Get the effective warmup blocks value - either specified or defaults to blocks

View File

@@ -29,8 +29,6 @@ pub(crate) struct BenchContext {
pub(crate) next_block: u64,
/// Whether the chain is an OP rollup.
pub(crate) is_optimism: bool,
/// Whether to use `reth_newPayload` endpoint instead of `engine_newPayload*`.
pub(crate) use_reth_namespace: bool,
}
impl BenchContext {
@@ -142,14 +140,6 @@ impl BenchContext {
};
let next_block = first_block.header.number + 1;
let use_reth_namespace = bench_args.reth_new_payload;
Ok(Self {
auth_provider,
block_provider,
benchmark_mode,
next_block,
is_optimism,
use_reth_namespace,
})
Ok(Self { auth_provider, block_provider, benchmark_mode, next_block, is_optimism })
}
}

View File

@@ -6,9 +6,7 @@ use crate::{
helpers::{build_payload, parse_gas_limit, prepare_payload_request, rpc_block_to_header},
output::GasRampPayloadFile,
},
valid_payload::{
call_forkchoice_updated_with_reth, call_new_payload_with_reth, payload_to_new_payload,
},
valid_payload::{call_forkchoice_updated, call_new_payload, payload_to_new_payload},
};
use alloy_eips::BlockNumberOrTag;
use alloy_provider::{network::AnyNetwork, Provider, RootProvider};
@@ -49,14 +47,6 @@ pub struct Command {
/// Output directory for benchmark results and generated payloads.
#[arg(long, value_name = "OUTPUT")]
output: PathBuf,
/// Use `reth_newPayload` endpoint instead of `engine_newPayload*`.
///
/// The `reth_newPayload` endpoint is a reth-specific extension that takes `ExecutionData`
/// directly, waits for persistence and cache updates to complete before processing,
/// and returns server-side timing breakdowns (latency, persistence wait, cache wait).
#[arg(long, default_value = "false", verbatim_doc_comment)]
reth_new_payload: bool,
}
/// Mode for determining when to stop ramping.
@@ -148,9 +138,6 @@ impl Command {
);
}
}
if self.reth_new_payload {
info!("Using reth_newPayload and reth_forkchoiceUpdated endpoints");
}
let mut blocks_processed = 0u64;
let total_benchmark_duration = Instant::now();
@@ -176,7 +163,7 @@ impl Command {
// Regenerate the payload from the modified block, but keep the original sidecar
// which contains the actual execution requests data (not just the hash)
let (payload, _) = ExecutionPayload::from_block_unchecked(block_hash, &block);
let (version, params, execution_data) = payload_to_new_payload(
let (version, params) = payload_to_new_payload(
payload,
sidecar,
false,
@@ -187,31 +174,20 @@ impl Command {
// Save payload to file with version info for replay
let payload_path =
self.output.join(format!("payload_block_{}.json", block.header.number));
let file = GasRampPayloadFile {
version: version as u8,
block_hash,
params: params.clone(),
execution_data: Some(execution_data.clone()),
};
let file =
GasRampPayloadFile { version: version as u8, block_hash, params: params.clone() };
let payload_json = serde_json::to_string_pretty(&file)?;
std::fs::write(&payload_path, &payload_json)?;
info!(target: "reth-bench", block_number = block.header.number, path = %payload_path.display(), "Saved payload");
let reth_data = self.reth_new_payload.then_some(execution_data);
let _ = call_new_payload_with_reth(&provider, version, params, reth_data).await?;
call_new_payload(&provider, version, params).await?;
let forkchoice_state = ForkchoiceState {
head_block_hash: block_hash,
safe_block_hash: block_hash,
finalized_block_hash: block_hash,
};
call_forkchoice_updated_with_reth(
&provider,
version,
forkchoice_state,
self.reth_new_payload,
)
.await?;
call_forkchoice_updated(&provider, version, forkchoice_state, None).await?;
parent_header = block.header;
parent_hash = block_hash;

View File

@@ -1,156 +0,0 @@
//! Prometheus metrics scraper for reth-bench.
//!
//! Scrapes a node's Prometheus metrics endpoint after each block to record
//! execution and state root durations with block-level granularity.
use csv::Writer;
use eyre::Context;
use reqwest::Client;
use serde::Serialize;
use std::{path::Path, time::Duration};
use tracing::info;
/// Suffix for the metrics CSV output file.
pub(crate) const METRICS_OUTPUT_SUFFIX: &str = "metrics.csv";
/// A single row of scraped prometheus metrics for one block.
#[derive(Debug, Clone, Serialize)]
pub(crate) struct MetricsRow {
/// The block number.
pub(crate) block_number: u64,
/// EVM execution duration in seconds (from `sync_execution_execution_duration` gauge).
pub(crate) execution_duration_secs: Option<f64>,
/// State root computation duration in seconds (from
/// `sync_block_validation_state_root_duration` gauge).
pub(crate) state_root_duration_secs: Option<f64>,
}
/// Scrapes a Prometheus metrics endpoint after each block to collect
/// execution and state root durations.
pub(crate) struct MetricsScraper {
/// The full URL of the Prometheus metrics endpoint.
url: String,
/// Reusable HTTP client.
client: Client,
/// Collected metrics rows, one per block.
rows: Vec<MetricsRow>,
}
impl MetricsScraper {
/// Creates a new scraper if a URL is provided.
pub(crate) fn maybe_new(url: Option<String>) -> Option<Self> {
url.map(|url| {
info!(target: "reth-bench", %url, "Prometheus metrics scraping enabled");
let client = Client::builder()
.timeout(Duration::from_secs(5))
.build()
.expect("failed to build reqwest client");
Self { url, client, rows: Vec::new() }
})
}
/// Scrapes the metrics endpoint and records values for the given block.
pub(crate) async fn scrape_after_block(&mut self, block_number: u64) -> eyre::Result<()> {
let text = self
.client
.get(&self.url)
.send()
.await
.wrap_err("failed to fetch metrics endpoint")?
.error_for_status()
.wrap_err("metrics endpoint returned error status")?
.text()
.await
.wrap_err("failed to read metrics response body")?;
let execution = parse_gauge(&text, "sync_execution_execution_duration");
let state_root = parse_gauge(&text, "sync_block_validation_state_root_duration");
self.rows.push(MetricsRow {
block_number,
execution_duration_secs: execution,
state_root_duration_secs: state_root,
});
Ok(())
}
/// Writes collected metrics to a CSV file in the output directory.
pub(crate) fn write_csv(&self, output_dir: &Path) -> eyre::Result<()> {
let path = output_dir.join(METRICS_OUTPUT_SUFFIX);
info!(target: "reth-bench", "Writing scraped metrics to file: {:?}", path);
let mut writer = Writer::from_path(&path)?;
for row in &self.rows {
writer.serialize(row)?;
}
writer.flush()?;
Ok(())
}
}
/// Parses a Prometheus gauge value from exposition-format text.
///
/// Searches for lines starting with `name` followed by either a space or `{`
/// (for labeled metrics), then parses the numeric value. Returns the last
/// matching sample to handle metrics emitted with multiple label sets.
fn parse_gauge(text: &str, name: &str) -> Option<f64> {
let mut result = None;
for line in text.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
if !line.starts_with(name) {
continue;
}
// Ensure we match the full metric name, not a prefix of another metric.
let rest = &line[name.len()..];
if !rest.starts_with(' ') && !rest.starts_with('{') {
continue;
}
// Format: `metric_name{labels} value [timestamp]` or `metric_name value [timestamp]`
// Value is always the second whitespace-separated token.
let mut parts = line.split_whitespace();
if let Some(value_str) = parts.nth(1) &&
let Ok(v) = value_str.parse::<f64>()
{
result = Some(v);
}
}
result
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_gauge_simple() {
let text = r#"# HELP sync_execution_execution_duration Duration of execution
# TYPE sync_execution_execution_duration gauge
sync_execution_execution_duration 0.123456
"#;
assert_eq!(parse_gauge(text, "sync_execution_execution_duration"), Some(0.123456));
}
#[test]
fn test_parse_gauge_missing() {
let text = "some_other_metric 1.0\n";
assert_eq!(parse_gauge(text, "sync_execution_execution_duration"), None);
}
#[test]
fn test_parse_gauge_with_labels() {
let text = "sync_block_validation_state_root_duration{instance=\"node1\"} 0.5\n";
assert_eq!(parse_gauge(text, "sync_block_validation_state_root_duration"), Some(0.5));
}
#[test]
fn test_parse_gauge_prefix_no_false_match() {
let text =
"sync_execution_execution_duration_total 99.0\nsync_execution_execution_duration 0.5\n";
assert_eq!(parse_gauge(text, "sync_execution_execution_duration"), Some(0.5));
}
}

View File

@@ -12,7 +12,6 @@ pub(crate) mod helpers;
pub use generate_big_block::{
RawTransaction, RpcTransactionSource, TransactionCollector, TransactionSource,
};
pub(crate) mod metrics_scraper;
mod new_payload_fcu;
mod new_payload_only;
mod output;

View File

@@ -13,7 +13,6 @@ use crate::{
bench::{
context::BenchContext,
helpers::parse_duration,
metrics_scraper::MetricsScraper,
output::{
write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
},
@@ -21,9 +20,7 @@ use crate::{
derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter,
},
},
valid_payload::{
block_to_new_payload, call_forkchoice_updated_with_reth, call_new_payload_with_reth,
},
valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload},
};
use alloy_provider::Provider;
use alloy_rpc_types_engine::ForkchoiceState;
@@ -33,7 +30,7 @@ use reth_cli_runner::CliContext;
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
use reth_node_core::args::BenchmarkArgs;
use std::time::{Duration, Instant};
use tracing::{debug, info, warn};
use tracing::{debug, info};
/// `reth benchmark new-payload-fcu` command
#[derive(Debug, Parser)]
@@ -153,17 +150,10 @@ impl Command {
auth_provider,
mut next_block,
is_optimism,
use_reth_namespace,
..
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
let total_blocks = benchmark_mode.total_blocks();
let mut metrics_scraper = MetricsScraper::maybe_new(self.benchmark.metrics_url.clone());
if use_reth_namespace {
info!("Using reth_newPayload and reth_forkchoiceUpdated endpoints");
}
let buffer_size = self.rpc_block_buffer_size;
// Use a oneshot channel to propagate errors from the spawned task
@@ -240,46 +230,16 @@ impl Command {
finalized_block_hash: finalized,
};
let (version, params, execution_data) = block_to_new_payload(block, is_optimism)?;
let (version, params) = block_to_new_payload(block, is_optimism)?;
let start = Instant::now();
let reth_data = use_reth_namespace.then_some(execution_data);
let server_timings =
call_new_payload_with_reth(&auth_provider, version, params, reth_data).await?;
call_new_payload(&auth_provider, version, params).await?;
let np_latency =
server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
let new_payload_result = NewPayloadResult {
gas_used,
latency: np_latency,
persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait),
execution_cache_wait: server_timings
.as_ref()
.map(|t| t.execution_cache_wait)
.unwrap_or_default(),
sparse_trie_wait: server_timings
.as_ref()
.map(|t| t.sparse_trie_wait)
.unwrap_or_default(),
};
let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
let fcu_start = Instant::now();
call_forkchoice_updated_with_reth(
&auth_provider,
version,
forkchoice_state,
use_reth_namespace,
)
.await?;
let fcu_latency = fcu_start.elapsed();
call_forkchoice_updated(&auth_provider, version, forkchoice_state, None).await?;
let total_latency = if server_timings.is_some() {
// When using server-side latency for newPayload, derive total from the
// independently measured components to avoid mixing server-side and
// client-side (network-inclusive) timings.
np_latency + fcu_latency
} else {
start.elapsed()
};
let total_latency = start.elapsed();
let fcu_latency = total_latency - new_payload_result.latency;
let combined_result = CombinedResult {
block_number,
gas_limit,
@@ -299,12 +259,6 @@ impl Command {
};
info!(target: "reth-bench", progress, %combined_result);
if let Some(scraper) = metrics_scraper.as_mut() &&
let Err(err) = scraper.scrape_after_block(block_number).await
{
warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics");
}
if let Some(w) = &mut waiter {
w.on_block(block_number).await?;
}
@@ -330,10 +284,6 @@ impl Command {
write_benchmark_results(path, &gas_output_results, &combined_results)?;
}
if let (Some(path), Some(scraper)) = (&self.benchmark.output, &metrics_scraper) {
scraper.write_csv(path)?;
}
let gas_output =
TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;

View File

@@ -3,13 +3,12 @@
use crate::{
bench::{
context::BenchContext,
metrics_scraper::MetricsScraper,
output::{
NewPayloadResult, TotalGasOutput, TotalGasRow, GAS_OUTPUT_SUFFIX,
NEW_PAYLOAD_OUTPUT_SUFFIX,
},
},
valid_payload::{block_to_new_payload, call_new_payload_with_reth},
valid_payload::{block_to_new_payload, call_new_payload},
};
use alloy_provider::Provider;
use clap::Parser;
@@ -50,17 +49,10 @@ impl Command {
auth_provider,
mut next_block,
is_optimism,
use_reth_namespace,
..
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
let total_blocks = benchmark_mode.total_blocks();
let mut metrics_scraper = MetricsScraper::maybe_new(self.benchmark.metrics_url.clone());
if use_reth_namespace {
info!("Using reth_newPayload endpoint");
}
let buffer_size = self.rpc_block_buffer_size;
// Use a oneshot channel to propagate errors from the spawned task
@@ -108,28 +100,12 @@ impl Command {
debug!(target: "reth-bench", number=?block.header.number, "Sending payload to engine");
let (version, params, execution_data) = block_to_new_payload(block, is_optimism)?;
let (version, params) = block_to_new_payload(block, is_optimism)?;
let start = Instant::now();
let reth_data = use_reth_namespace.then_some(execution_data);
let server_timings =
call_new_payload_with_reth(&auth_provider, version, params, reth_data).await?;
call_new_payload(&auth_provider, version, params).await?;
let latency =
server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
let new_payload_result = NewPayloadResult {
gas_used,
latency,
persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait),
execution_cache_wait: server_timings
.as_ref()
.map(|t| t.execution_cache_wait)
.unwrap_or_default(),
sparse_trie_wait: server_timings
.as_ref()
.map(|t| t.sparse_trie_wait)
.unwrap_or_default(),
};
let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
blocks_processed += 1;
let progress = match total_blocks {
Some(total) => format!("{blocks_processed}/{total}"),
@@ -145,12 +121,6 @@ impl Command {
let row =
TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
results.push((row, new_payload_result));
if let Some(scraper) = metrics_scraper.as_mut() &&
let Err(err) = scraper.scrape_after_block(block_number).await
{
tracing::warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics");
}
}
// Check if the spawned task encountered an error
@@ -181,10 +151,6 @@ impl Command {
}
writer.flush()?;
if let Some(scraper) = &metrics_scraper {
scraper.write_csv(&path)?;
}
info!(target: "reth-bench", "Finished writing benchmark output files to {:?}.", path);
}

View File

@@ -27,9 +27,6 @@ pub(crate) struct GasRampPayloadFile {
pub(crate) block_hash: B256,
/// The params to pass to newPayload.
pub(crate) params: serde_json::Value,
/// The execution data for `reth_newPayload`.
#[serde(skip_serializing_if = "Option::is_none", default)]
pub(crate) execution_data: Option<alloy_rpc_types_engine::ExecutionData>,
}
/// This represents the results of a single `newPayload` call in the benchmark, containing the gas
@@ -40,12 +37,6 @@ pub(crate) struct NewPayloadResult {
pub(crate) gas_used: u64,
/// The latency of the `newPayload` call.
pub(crate) latency: Duration,
/// Time spent waiting for persistence. `None` when no persistence was in-flight.
pub(crate) persistence_wait: Option<Duration>,
/// Time spent waiting for execution cache lock.
pub(crate) execution_cache_wait: Duration,
/// Time spent waiting for sparse trie lock.
pub(crate) sparse_trie_wait: Duration,
}
impl NewPayloadResult {
@@ -76,12 +67,9 @@ impl Serialize for NewPayloadResult {
{
// convert the time to microseconds
let time = self.latency.as_micros();
let mut state = serializer.serialize_struct("NewPayloadResult", 5)?;
let mut state = serializer.serialize_struct("NewPayloadResult", 2)?;
state.serialize_field("gas_used", &self.gas_used)?;
state.serialize_field("latency", &time)?;
state.serialize_field("persistence_wait", &self.persistence_wait.map(|d| d.as_micros()))?;
state.serialize_field("execution_cache_wait", &self.execution_cache_wait.as_micros())?;
state.serialize_field("sparse_trie_wait", &self.sparse_trie_wait.as_micros())?;
state.end()
}
}
@@ -138,7 +126,7 @@ impl Serialize for CombinedResult {
let fcu_latency = self.fcu_latency.as_micros();
let new_payload_latency = self.new_payload_result.latency.as_micros();
let total_latency = self.total_latency.as_micros();
let mut state = serializer.serialize_struct("CombinedResult", 10)?;
let mut state = serializer.serialize_struct("CombinedResult", 7)?;
// flatten the new payload result because this is meant for CSV writing
state.serialize_field("block_number", &self.block_number)?;
@@ -148,18 +136,6 @@ impl Serialize for CombinedResult {
state.serialize_field("new_payload_latency", &new_payload_latency)?;
state.serialize_field("fcu_latency", &fcu_latency)?;
state.serialize_field("total_latency", &total_latency)?;
state.serialize_field(
"persistence_wait",
&self.new_payload_result.persistence_wait.map(|d| d.as_micros()),
)?;
state.serialize_field(
"execution_cache_wait",
&self.new_payload_result.execution_cache_wait.as_micros(),
)?;
state.serialize_field(
"sparse_trie_wait",
&self.new_payload_result.sparse_trie_wait.as_micros(),
)?;
state.end()
}
}

View File

@@ -15,7 +15,6 @@ use crate::{
authenticated_transport::AuthenticatedTransportConnect,
bench::{
helpers::parse_duration,
metrics_scraper::MetricsScraper,
output::{
write_benchmark_results, CombinedResult, GasRampPayloadFile, NewPayloadResult,
TotalGasOutput, TotalGasRow,
@@ -24,15 +23,12 @@ use crate::{
derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter,
},
},
valid_payload::{call_forkchoice_updated_with_reth, call_new_payload_with_reth},
valid_payload::{call_forkchoice_updated, call_new_payload},
};
use alloy_primitives::B256;
use alloy_provider::{network::AnyNetwork, Provider, RootProvider};
use alloy_provider::{ext::EngineApi, network::AnyNetwork, Provider, RootProvider};
use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types_engine::{
CancunPayloadFields, ExecutionData, ExecutionPayloadEnvelopeV4, ExecutionPayloadSidecar,
ForkchoiceState, JwtSecret, PraguePayloadFields,
};
use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV4, ForkchoiceState, JwtSecret};
use clap::Parser;
use eyre::Context;
use reth_cli_runner::CliContext;
@@ -128,22 +124,6 @@ pub struct Command {
/// If not provided, derives from engine RPC URL by changing scheme to ws and port to 8546.
#[arg(long, value_name = "WS_RPC_URL", verbatim_doc_comment)]
ws_rpc_url: Option<String>,
/// Use `reth_newPayload` endpoint instead of `engine_newPayload*`.
///
/// The `reth_newPayload` endpoint is a reth-specific extension that takes `ExecutionData`
/// directly, waits for persistence and cache updates to complete before processing,
/// and returns server-side timing breakdowns (latency, persistence wait, cache wait).
#[arg(long, default_value = "false", verbatim_doc_comment)]
reth_new_payload: bool,
/// Optional Prometheus metrics endpoint to scrape after each block.
///
/// When provided, reth-bench will fetch metrics from this URL after each
/// payload, recording per-block execution and state root durations.
/// Results are written to `metrics.csv` in the output directory.
#[arg(long = "metrics-url", value_name = "URL", verbatim_doc_comment)]
metrics_url: Option<String>,
}
/// A loaded payload ready for execution.
@@ -183,9 +163,6 @@ impl Command {
self.persistence_threshold
);
}
if self.reth_new_payload {
info!("Using reth_newPayload and reth_forkchoiceUpdated endpoints");
}
// Set up waiter based on configured options
// When both are set: wait at least wait_time, and also wait for persistence if needed
@@ -213,8 +190,6 @@ impl Command {
(None, false) => None,
};
let mut metrics_scraper = MetricsScraper::maybe_new(self.metrics_url.clone());
// Set up authenticated engine provider
let jwt =
std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?;
@@ -273,28 +248,14 @@ impl Command {
"Executing gas ramp payload (newPayload + FCU)"
);
let reth_data =
if self.reth_new_payload { payload.file.execution_data.clone() } else { None };
let _ = call_new_payload_with_reth(
&auth_provider,
payload.version,
payload.file.params.clone(),
reth_data,
)
.await?;
call_new_payload(&auth_provider, payload.version, payload.file.params.clone()).await?;
let fcu_state = ForkchoiceState {
head_block_hash: payload.file.block_hash,
safe_block_hash: parent_hash,
finalized_block_hash: parent_hash,
};
call_forkchoice_updated_with_reth(
&auth_provider,
payload.version,
fcu_state,
self.reth_new_payload,
)
.await?;
call_forkchoice_updated(&auth_provider, payload.version, fcu_state, None).await?;
info!(target: "reth-bench", gas_ramp_payload = i + 1, "Gas ramp payload executed successfully");
@@ -342,47 +303,20 @@ impl Command {
"Sending newPayload"
);
let params = serde_json::to_value((
execution_payload.clone(),
Vec::<B256>::new(),
B256::ZERO,
envelope.execution_requests.to_vec(),
))?;
let status = auth_provider
.new_payload_v4(
execution_payload.clone(),
vec![],
B256::ZERO,
envelope.execution_requests.to_vec(),
)
.await?;
let reth_data = self.reth_new_payload.then(|| ExecutionData {
payload: execution_payload.clone().into(),
sidecar: ExecutionPayloadSidecar::v4(
CancunPayloadFields {
versioned_hashes: Vec::new(),
parent_beacon_block_root: B256::ZERO,
},
PraguePayloadFields { requests: envelope.execution_requests.clone().into() },
),
});
let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
let server_timings = call_new_payload_with_reth(
&auth_provider,
EngineApiMessageVersion::V4,
params,
reth_data,
)
.await?;
let np_latency =
server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
let new_payload_result = NewPayloadResult {
gas_used,
latency: np_latency,
persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait),
execution_cache_wait: server_timings
.as_ref()
.map(|t| t.execution_cache_wait)
.unwrap_or_default(),
sparse_trie_wait: server_timings
.as_ref()
.map(|t| t.sparse_trie_wait)
.unwrap_or_default(),
};
if !status.is_valid() {
return Err(eyre::eyre!("Payload rejected: {:?}", status));
}
let fcu_state = ForkchoiceState {
head_block_hash: block_hash,
@@ -390,18 +324,12 @@ impl Command {
finalized_block_hash: parent_hash,
};
let fcu_start = Instant::now();
call_forkchoice_updated_with_reth(
&auth_provider,
EngineApiMessageVersion::V4,
fcu_state,
self.reth_new_payload,
)
.await?;
let fcu_latency = fcu_start.elapsed();
debug!(target: "reth-bench", method = "engine_forkchoiceUpdatedV3", ?fcu_state, "Sending forkchoiceUpdated");
let total_latency =
if server_timings.is_some() { np_latency + fcu_latency } else { start.elapsed() };
let fcu_result = auth_provider.fork_choice_updated_v3(fcu_state, None).await?;
let total_latency = start.elapsed();
let fcu_latency = total_latency - new_payload_result.latency;
let combined_result = CombinedResult {
block_number,
@@ -416,12 +344,6 @@ impl Command {
let progress = format!("{}/{}", i + 1, payloads.len());
info!(target: "reth-bench", progress, %combined_result);
if let Some(scraper) = metrics_scraper.as_mut() &&
let Err(err) = scraper.scrape_after_block(block_number).await
{
tracing::warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics");
}
if let Some(w) = &mut waiter {
w.on_block(block_number).await?;
}
@@ -430,6 +352,7 @@ impl Command {
TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
results.push((gas_row, combined_result));
debug!(target: "reth-bench", ?status, ?fcu_result, "Payload executed successfully");
parent_hash = block_hash;
}
@@ -444,10 +367,6 @@ impl Command {
write_benchmark_results(path, &gas_output_results, &combined_results)?;
}
if let (Some(path), Some(scraper)) = (&self.output, &metrics_scraper) {
scraper.write_csv(path)?;
}
let gas_output =
TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;
info!(

View File

@@ -6,14 +6,12 @@ use alloy_eips::eip7685::Requests;
use alloy_primitives::B256;
use alloy_provider::{ext::EngineApi, network::AnyRpcBlock, Network, Provider};
use alloy_rpc_types_engine::{
ExecutionData, ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar,
ForkchoiceState, ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ForkchoiceState,
ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
};
use alloy_transport::TransportResult;
use op_alloy_rpc_types_engine::OpExecutionPayloadV4;
use reth_node_api::EngineApiMessageVersion;
use serde::Deserialize;
use std::time::Duration;
use tracing::{debug, error};
/// An extension trait for providers that implement the engine API, to wait for a VALID response.
@@ -163,13 +161,10 @@ where
}
}
/// Converts an RPC block into versioned engine API params and an [`ExecutionData`].
///
/// Returns `(version, versioned_params, execution_data)`.
pub(crate) fn block_to_new_payload(
block: AnyRpcBlock,
is_optimism: bool,
) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value, ExecutionData)> {
) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value)> {
let block = block
.into_inner()
.map_header(|header| header.map(|h| h.into_header_with_defaults()))
@@ -184,19 +179,13 @@ pub(crate) fn block_to_new_payload(
payload_to_new_payload(payload, sidecar, is_optimism, block.withdrawals_root, None)
}
/// Converts an execution payload and sidecar into versioned engine API params and an
/// [`ExecutionData`].
///
/// Returns `(version, versioned_params, execution_data)`.
pub(crate) fn payload_to_new_payload(
payload: ExecutionPayload,
sidecar: ExecutionPayloadSidecar,
is_optimism: bool,
withdrawals_root: Option<B256>,
target_version: Option<EngineApiMessageVersion>,
) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value, ExecutionData)> {
let execution_data = ExecutionData { payload: payload.clone(), sidecar: sidecar.clone() };
) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value)> {
let (version, params) = match payload {
ExecutionPayload::V3(payload) => {
let cancun = sidecar.cancun().unwrap();
@@ -255,7 +244,7 @@ pub(crate) fn payload_to_new_payload(
}
};
Ok((version, params, execution_data))
Ok((version, params))
}
/// Calls the correct `engine_newPayload` method depending on the given [`ExecutionPayload`] and its
@@ -263,109 +252,32 @@ pub(crate) fn payload_to_new_payload(
///
/// # Panics
/// If the given payload is a V3 payload, but a parent beacon block root is provided as `None`.
#[allow(dead_code)]
pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
provider: P,
version: EngineApiMessageVersion,
params: serde_json::Value,
) -> TransportResult<Option<NewPayloadTimingBreakdown>> {
call_new_payload_with_reth(provider, version, params, None).await
}
) -> TransportResult<()> {
let method = version.method_name();
/// Response from `reth_newPayload` endpoint, which includes server-measured latency.
#[derive(Debug, Deserialize)]
struct RethPayloadStatus {
#[serde(flatten)]
status: PayloadStatus,
latency_us: u64,
#[serde(default)]
persistence_wait_us: Option<u64>,
#[serde(default)]
execution_cache_wait_us: u64,
#[serde(default)]
sparse_trie_wait_us: u64,
}
debug!(target: "reth-bench", method, "Sending newPayload");
/// Server-side timing breakdown from `reth_newPayload` endpoint.
#[derive(Debug, Clone, Copy, Default)]
pub(crate) struct NewPayloadTimingBreakdown {
/// Server-side execution latency.
pub(crate) latency: Duration,
/// Time spent waiting for persistence. `None` when no persistence was in-flight.
pub(crate) persistence_wait: Option<Duration>,
/// Time spent waiting for execution cache lock.
pub(crate) execution_cache_wait: Duration,
/// Time spent waiting for sparse trie lock.
pub(crate) sparse_trie_wait: Duration,
}
let mut status: PayloadStatus = provider.client().request(method, &params).await?;
/// Calls either `engine_newPayload*` or `reth_newPayload` depending on whether
/// `reth_execution_data` is provided.
///
/// When `reth_execution_data` is `Some`, uses the `reth_newPayload` endpoint which takes
/// `ExecutionData` directly and waits for persistence and cache updates to complete.
///
/// Returns the server-reported timing breakdown when using the reth namespace, or `None` for
/// the standard engine namespace.
pub(crate) async fn call_new_payload_with_reth<N: Network, P: Provider<N>>(
provider: P,
version: EngineApiMessageVersion,
params: serde_json::Value,
reth_execution_data: Option<ExecutionData>,
) -> TransportResult<Option<NewPayloadTimingBreakdown>> {
if let Some(execution_data) = reth_execution_data {
let method = "reth_newPayload";
let reth_params = serde_json::to_value((execution_data.clone(),))
.expect("ExecutionData serialization cannot fail");
debug!(target: "reth-bench", method, "Sending newPayload");
let mut resp: RethPayloadStatus = provider.client().request(method, &reth_params).await?;
while !resp.status.is_valid() {
if resp.status.is_invalid() {
error!(target: "reth-bench", status=?resp.status, "Invalid {method}");
return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(
std::io::Error::other(format!("Invalid {method}: {:?}", resp.status)),
)))
}
if resp.status.is_syncing() {
return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
"invalid range: no canonical state found for parent of requested block",
))
}
resp = provider.client().request(method, &reth_params).await?;
while !status.is_valid() {
if status.is_invalid() {
error!(target: "reth-bench", ?status, ?params, "Invalid {method}",);
return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(std::io::Error::other(
format!("Invalid {method}: {status:?}"),
))))
}
Ok(Some(NewPayloadTimingBreakdown {
latency: Duration::from_micros(resp.latency_us),
persistence_wait: resp.persistence_wait_us.map(Duration::from_micros),
execution_cache_wait: Duration::from_micros(resp.execution_cache_wait_us),
sparse_trie_wait: Duration::from_micros(resp.sparse_trie_wait_us),
}))
} else {
let method = version.method_name();
debug!(target: "reth-bench", method, "Sending newPayload");
let mut status: PayloadStatus = provider.client().request(method, &params).await?;
while !status.is_valid() {
if status.is_invalid() {
error!(target: "reth-bench", ?status, ?params, "Invalid {method}",);
return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(
std::io::Error::other(format!("Invalid {method}: {status:?}")),
)))
}
if status.is_syncing() {
return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
"invalid range: no canonical state found for parent of requested block",
))
}
status = provider.client().request(method, &params).await?;
if status.is_syncing() {
return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
"invalid range: no canonical state found for parent of requested block",
))
}
Ok(None)
status = provider.client().request(method, &params).await?;
}
Ok(())
}
/// Calls the correct `engine_forkchoiceUpdated` method depending on the given
@@ -392,47 +304,3 @@ pub(crate) async fn call_forkchoice_updated<N, P: EngineApiValidWaitExt<N>>(
}
}
}
/// Calls either `reth_forkchoiceUpdated` or the standard `engine_forkchoiceUpdated*` depending
/// on `use_reth`.
///
/// When `use_reth` is true, uses the `reth_forkchoiceUpdated` endpoint which sends a regular FCU
/// with no payload attributes.
pub(crate) async fn call_forkchoice_updated_with_reth<
N: Network,
P: Provider<N> + EngineApiValidWaitExt<N>,
>(
provider: P,
message_version: EngineApiMessageVersion,
forkchoice_state: ForkchoiceState,
use_reth: bool,
) -> TransportResult<ForkchoiceUpdated> {
if use_reth {
let method = "reth_forkchoiceUpdated";
let reth_params = serde_json::to_value((forkchoice_state,))
.expect("ForkchoiceState serialization cannot fail");
debug!(target: "reth-bench", method, "Sending forkchoiceUpdated");
let mut resp: ForkchoiceUpdated = provider.client().request(method, &reth_params).await?;
while !resp.is_valid() {
if resp.is_invalid() {
error!(target: "reth-bench", ?resp, "Invalid {method}");
return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(
std::io::Error::other(format!("Invalid {method}: {resp:?}")),
)))
}
if resp.is_syncing() {
return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
"invalid range: no canonical state found for parent of requested block",
))
}
resp = provider.client().request(method, &reth_params).await?;
}
Ok(resp)
} else {
call_forkchoice_updated(provider, message_version, forkchoice_state, None).await
}
}

View File

@@ -30,8 +30,7 @@ workspace = true
# reth
reth-ethereum-cli.workspace = true
reth-chainspec.workspace = true
reth-primitives-traits.workspace = true
reth-ethereum-primitives.workspace = true
reth-primitives.workspace = true
reth-db = { workspace = true, features = ["mdbx"] }
reth-provider.workspace = true
reth-revm.workspace = true
@@ -111,6 +110,7 @@ dev = ["reth-ethereum-cli/dev"]
asm-keccak = [
"reth-node-core/asm-keccak",
"reth-primitives/asm-keccak",
"reth-ethereum-cli/asm-keccak",
"reth-node-ethereum/asm-keccak",
"alloy-primitives/asm-keccak",
@@ -190,7 +190,6 @@ min-trace-logs = [
"reth-node-core/min-trace-logs",
]
trie-debug = ["reth-node-builder/trie-debug", "reth-node-core/trie-debug"]
rocksdb = ["reth-ethereum-cli/rocksdb", "reth-node-core/rocksdb"]
edge = ["rocksdb"]

View File

@@ -124,11 +124,9 @@ pub mod providers {
pub use reth_provider::*;
}
/// Re-exported primitives.
#[allow(ambiguous_glob_reexports)]
/// Re-exported from `reth_primitives`.
pub mod primitives {
pub use reth_ethereum_primitives::*;
pub use reth_primitives_traits::*;
pub use reth_primitives::*;
}
/// Re-exported from `reth_ethereum_consensus`.

View File

@@ -1061,6 +1061,14 @@ mod tests {
) -> ProviderResult<Option<StorageValue>> {
Ok(None)
}
fn storage_by_hashed_key(
&self,
_address: Address,
_hashed_storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
Ok(None)
}
}
impl BytecodeReader for MockStateProvider {

View File

@@ -223,6 +223,26 @@ impl<N: NodePrimitives> StateProvider for MemoryOverlayStateProviderRef<'_, N> {
self.historical.storage(address, storage_key)
}
fn storage_by_hashed_key(
&self,
address: Address,
hashed_storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
let hashed_address = keccak256(address);
let state = &self.trie_input().state;
if let Some(hs) = state.storages.get(&hashed_address) {
if let Some(value) = hs.storage.get(&hashed_storage_key) {
return Ok(Some(*value));
}
if hs.wiped {
return Ok(Some(StorageValue::ZERO));
}
}
self.historical.storage_by_hashed_key(address, hashed_storage_key)
}
}
impl<N: NodePrimitives> BytecodeReader for MemoryOverlayStateProviderRef<'_, N> {

View File

@@ -132,6 +132,6 @@ impl<H: BlockHeader> EthChainSpec for ChainSpec<H> {
}
fn final_paris_total_difficulty(&self) -> Option<U256> {
self.get_final_paris_total_difficulty()
self.paris_block_and_final_difficulty.map(|(_, final_difficulty)| final_difficulty)
}
}

View File

@@ -18,5 +18,6 @@ alloy-genesis.workspace = true
# misc
clap.workspace = true
shellexpand.workspace = true
eyre.workspace = true
serde_json.workspace = true

View File

@@ -73,7 +73,7 @@ pub trait ChainSpecParser: Clone + Send + Sync + 'static {
/// A helper to parse a [`Genesis`](alloy_genesis::Genesis) as argument or from disk.
pub fn parse_genesis(s: &str) -> eyre::Result<alloy_genesis::Genesis> {
// try to read json from path first
let raw = match fs::read_to_string(PathBuf::from(s)) {
let raw = match fs::read_to_string(PathBuf::from(shellexpand::full(s)?.into_owned())) {
Ok(raw) => raw,
Err(io_err) => {
// valid json may start with "\n", but must contain "{"

View File

@@ -43,7 +43,7 @@ reth-node-metrics.workspace = true
reth-ethereum-primitives = { workspace = true, optional = true }
reth-provider.workspace = true
reth-prune.workspace = true
reth-prune-types.workspace = true
reth-prune-types = { workspace = true, optional = true }
reth-revm.workspace = true
reth-stages.workspace = true
reth-stages-types = { workspace = true, optional = true }
@@ -53,7 +53,7 @@ reth-tasks.workspace = true
reth-storage-api.workspace = true
reth-trie = { workspace = true, features = ["metrics"] }
reth-trie-db = { workspace = true, features = ["metrics"] }
reth-trie-common = { workspace = true, optional = true }
reth-trie-common.workspace = true
reth-primitives-traits.workspace = true
reth-discv4.workspace = true
reth-discv5.workspace = true
@@ -113,7 +113,6 @@ arbitrary = [
"dep:proptest",
"dep:arbitrary",
"dep:proptest-arbitrary-interop",
"dep:reth-trie-common",
"reth-db-api/arbitrary",
"reth-eth-wire/arbitrary",
"reth-db/arbitrary",
@@ -124,11 +123,11 @@ arbitrary = [
"reth-codecs/test-utils",
"reth-prune-types/test-utils",
"reth-stages-types/test-utils",
"reth-trie-common?/test-utils",
"reth-trie-common/test-utils",
"reth-codecs/arbitrary",
"reth-prune-types/arbitrary",
"reth-prune-types?/arbitrary",
"reth-stages-types?/arbitrary",
"reth-trie-common?/arbitrary",
"reth-trie-common/arbitrary",
"alloy-consensus/arbitrary",
"reth-primitives-traits/arbitrary",
"reth-ethereum-primitives/arbitrary",

View File

@@ -89,15 +89,13 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
/// Initializes environment according to [`AccessRights`] and returns an instance of
/// [`Environment`].
///
/// The provided `runtime` is used for parallel storage I/O.
pub fn init<N: CliNodeTypes>(
&self,
access: AccessRights,
runtime: reth_tasks::Runtime,
) -> eyre::Result<Environment<N>>
/// Internally builds a [`reth_tasks::Runtime`] attached to the current tokio handle for
/// parallel storage I/O.
pub fn init<N: CliNodeTypes>(&self, access: AccessRights) -> eyre::Result<Environment<N>>
where
C: ChainSpecParser<ChainSpec = N::ChainSpec>,
{
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
let data_dir = self.datadir.clone().resolve_datadir(self.chain.chain());
let db_path = data_dir.db();
let sf_path = data_dir.static_files();
@@ -146,24 +144,11 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
})
}
};
let rocksdb_provider = if !access.is_read_write() && !RocksDBProvider::exists(&rocksdb_path)
{
// RocksDB database doesn't exist yet (e.g. datadir restored from a snapshot
// or created before RocksDB storage). Create an empty one so read-only
// commands can proceed.
debug!(target: "reth::cli", ?rocksdb_path, "RocksDB not found, initializing empty database");
reth_fs_util::create_dir_all(&rocksdb_path)?;
RocksDBProvider::builder(data_dir.rocksdb())
.with_default_tables()
.with_database_log_level(self.db.log_level)
.build()?
} else {
RocksDBProvider::builder(data_dir.rocksdb())
.with_default_tables()
.with_database_log_level(self.db.log_level)
.with_read_only(!access.is_read_write())
.build()?
};
let rocksdb_provider = RocksDBProvider::builder(data_dir.rocksdb())
.with_default_tables()
.with_database_log_level(self.db.log_level)
.with_read_only(!access.is_read_write())
.build()?;
let provider_factory =
self.create_provider_factory(&config, db, sfp, rocksdb_provider, access, runtime)?;

View File

@@ -14,7 +14,7 @@ use reth_db_api::{
use reth_db_common::DbTool;
use reth_node_builder::{NodeTypesWithDB, NodeTypesWithDBAdapter};
use reth_provider::{providers::ProviderNodeTypes, DBProvider, StaticFileProviderFactory};
use reth_static_file_types::{ChangesetOffset, StaticFileSegment};
use reth_static_file_types::StaticFileSegment;
use std::{
hash::{BuildHasher, Hasher},
time::{Duration, Instant},
@@ -134,12 +134,12 @@ fn checksum_static_file<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
.ok_or_else(|| eyre::eyre!("No static files found for segment: {}", segment))?;
let start_time = Instant::now();
let mut hasher = checksum_hasher();
let mut total = 0usize;
let limit = limit.unwrap_or(usize::MAX);
let mut checksummer = Checksummer::new(checksum_hasher(), limit);
let start_block = start_block.unwrap_or(0);
let end_block = end_block.unwrap_or(u64::MAX);
let is_change_based = segment.is_change_based();
info!(
"Computing checksum for {} static files, start_block={}, end_block={}, limit={:?}",
@@ -149,8 +149,7 @@ fn checksum_static_file<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
if limit == usize::MAX { None } else { Some(limit) }
);
let mut reached_limit = false;
for (block_range, _header) in ranges.iter().sorted_by_key(|(range, _)| range.start()) {
'outer: for (block_range, _header) in ranges.iter().sorted_by_key(|(range, _)| range.start()) {
if block_range.end() < start_block || block_range.start() > end_block {
continue;
}
@@ -168,42 +167,28 @@ fn checksum_static_file<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
let mut cursor = jar_provider.cursor()?;
if is_change_based {
let offsets = jar_provider.read_changeset_offsets()?.ok_or_else(|| {
eyre::eyre!(
"Missing changeset offsets sidecar for segment {} at range {}",
segment,
block_range
)
})?;
let input = ChangeBasedChecksumInput {
segment,
block_range_start: block_range.start(),
start_block,
end_block,
offsets: &offsets,
};
while let Ok(Some(row)) = cursor.next_row() {
for col_data in row.iter() {
hasher.write(col_data);
}
reached_limit = checksum_change_based_segment(&mut checksummer, input, &mut cursor)?;
} else {
while let Some(row) = cursor.next_row()? {
if checksummer.write_row(&row) {
reached_limit = true;
break;
}
total += 1;
if total.is_multiple_of(PROGRESS_LOG_INTERVAL) {
info!("Hashed {total} entries.");
}
if total >= limit {
break 'outer;
}
}
// Explicitly drop provider before removing from cache to avoid deadlock
drop(jar_provider);
static_file_provider.remove_cached_provider(segment, fixed_block_range.end());
if reached_limit {
break;
}
}
let (checksum, total) = checksummer.finish();
let checksum = hasher.finish();
let elapsed = start_time.elapsed();
info!(
@@ -282,7 +267,7 @@ impl<N: ProviderNodeTypes> TableViewer<(u64, Duration)> for ChecksumViewer<'_, N
total = index + 1;
if total >= limit {
break;
break
}
}
@@ -300,139 +285,3 @@ impl<N: ProviderNodeTypes> TableViewer<(u64, Duration)> for ChecksumViewer<'_, N
Ok((checksum, elapsed))
}
}
/// Accumulates a checksum over key-value entries, tracking count and limit.
struct Checksummer<H> {
hasher: H,
total: usize,
limit: usize,
}
impl<H: Hasher> Checksummer<H> {
fn new(hasher: H, limit: usize) -> Self {
Self { hasher, total: 0, limit }
}
/// Hash a row's columns (non-changeset segments). Returns `true` if the limit is reached.
fn write_row(&mut self, row: &[&[u8]]) -> bool {
for col in row {
self.hasher.write(col);
}
self.advance()
}
/// Hash a key + value as two separate writes, matching MDBX raw entry semantics.
/// Write boundaries matter: foldhash rotates its accumulator by `len` on each `write`.
fn write_entry(&mut self, key: &[u8], value: &[u8]) -> bool {
self.hasher.write(key);
self.hasher.write(value);
self.advance()
}
fn advance(&mut self) -> bool {
self.total += 1;
if self.total.is_multiple_of(PROGRESS_LOG_INTERVAL) {
info!("Hashed {} entries.", self.total);
}
self.total >= self.limit
}
fn finish(self) -> (u64, usize) {
(self.hasher.finish(), self.total)
}
}
/// Reconstruct MDBX `StorageChangeSets` key/value boundaries from a static-file row.
///
/// MDBX layout:
/// - key: `BlockNumberAddress` => `[8B block_number][20B address]`
/// - value: `StorageEntry` => `[32B storage_key][compact U256 value]`
///
/// Static-file row layout for `StorageBeforeTx`:
/// - `[20B address][32B storage_key][compact U256 value]`
fn split_storage_changeset_row(block_number: u64, row: &[u8]) -> eyre::Result<([u8; 28], &[u8])> {
if row.len() < 20 {
return Err(eyre::eyre!(
"Storage changeset row too short: expected at least 20 bytes, got {}",
row.len()
));
}
let mut key_buf = [0u8; 28];
key_buf[..8].copy_from_slice(&block_number.to_be_bytes());
key_buf[8..].copy_from_slice(&row[..20]);
Ok((key_buf, &row[20..]))
}
struct ChangeBasedChecksumInput<'a> {
segment: StaticFileSegment,
block_range_start: u64,
start_block: u64,
end_block: u64,
offsets: &'a [ChangesetOffset],
}
fn checksum_change_based_segment<H: Hasher>(
checksummer: &mut Checksummer<H>,
input: ChangeBasedChecksumInput<'_>,
cursor: &mut reth_db::static_file::StaticFileCursor<'_>,
) -> eyre::Result<bool> {
let ChangeBasedChecksumInput { segment, block_range_start, start_block, end_block, offsets } =
input;
let is_storage = segment.is_storage_change_sets();
let mut reached_limit = false;
for (offset_index, offset) in offsets.iter().enumerate() {
let block_number = block_range_start + offset_index as u64;
let include = block_number >= start_block && block_number <= end_block;
for _ in 0..offset.num_changes() {
let row = cursor.next_row()?.ok_or_else(|| {
eyre::eyre!(
"Unexpected EOF while checksumming {} static file at range starting {}",
segment,
block_range_start
)
})?;
if !include {
continue;
}
// Reconstruct MDBX key/value write boundaries. foldhash rotates
// its accumulator by `len` on each write(), so boundaries must
// match exactly.
let done = if is_storage {
// StorageChangeSets: MDBX key = BlockNumberAddress (28B),
// value = compact StorageEntry. Column 0 is compact
// StorageBeforeTx = [20B address][32B key][compact U256].
let col = row[0];
let (key, value) = split_storage_changeset_row(block_number, col)?;
checksummer.write_entry(&key, value)
} else {
// AccountChangeSets: MDBX key = BlockNumber (8B),
// value = compact AccountBeforeTx (= column 0).
checksummer.write_entry(&block_number.to_be_bytes(), row[0])
};
if done {
reached_limit = true;
break;
}
}
if reached_limit {
break;
}
}
if !reached_limit && cursor.next_row()?.is_some() {
return Err(eyre::eyre!(
"Changeset offsets do not cover all rows for {} at range starting {}",
segment,
block_range_start
));
}
Ok(reached_limit)
}

View File

@@ -98,7 +98,7 @@ impl Command {
)?;
if let Some(entry) = entry {
let se: reth_primitives_traits::StorageEntry = entry;
let se: reth_primitives_traits::StorageEntry = entry.into();
println!("{}", serde_json::to_string_pretty(&se)?);
} else {
error!(target: "reth::cli", "No content for the given table key.");
@@ -110,7 +110,7 @@ impl Command {
let serializable: Vec<_> = changesets
.into_iter()
.map(|(addr, entry)| {
let se: reth_primitives_traits::StorageEntry = entry;
let se: reth_primitives_traits::StorageEntry = entry.into();
(addr, se)
})
.collect();

View File

@@ -16,12 +16,10 @@ mod copy;
mod diff;
mod get;
mod list;
mod prune_checkpoints;
mod repair_trie;
mod settings;
mod state;
mod static_file_header;
mod static_files;
mod stats;
/// DB List TUI
mod tui;
@@ -63,16 +61,12 @@ pub enum Subcommands {
RepairTrie(repair_trie::Command),
/// Reads and displays the static file segment header
StaticFileHeader(static_file_header::Command),
/// Static file operations (split, etc.)
StaticFiles(static_files::Command),
/// Lists current and local database versions
Version,
/// Returns the full database path
Path,
/// Manage storage settings
Settings(settings::Command),
/// View or set prune checkpoints
PruneCheckpoints(prune_checkpoints::Command),
/// Gets storage size information for an account
AccountStorage(account_storage::Command),
/// Gets account state and storage at a specific block
@@ -89,8 +83,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
/// provided command.
macro_rules! db_exec {
($env:expr, $tool:ident, $N:ident, $access_rights:expr, $command:block) => {
let Environment { provider_factory, .. } =
$env.init::<$N>($access_rights, ctx.task_executor.clone())?;
let Environment { provider_factory, .. } = $env.init::<$N>($access_rights)?;
let $tool = DbTool::new(provider_factory)?;
$command;
@@ -188,11 +181,6 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
command.execute(&tool)?;
});
}
Subcommands::StaticFiles(command) => {
db_exec!(self.env, tool, N, AccessRights::RO, {
command.execute(&tool)?;
});
}
Subcommands::Version => {
let local_db_version = match get_db_version(&db_path) {
Ok(version) => Some(version),
@@ -216,11 +204,6 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
command.execute(&tool)?;
});
}
Subcommands::PruneCheckpoints(command) => {
db_exec!(self.env, tool, N, command.access_rights(), {
command.execute(&tool)?;
});
}
Subcommands::AccountStorage(command) => {
db_exec!(self.env, tool, N, AccessRights::RO, {
command.execute(&tool)?;

View File

@@ -1,221 +0,0 @@
//! `reth db prune-checkpoints` command for viewing and setting prune checkpoint values.
use clap::{Args, Parser, Subcommand, ValueEnum};
use reth_db_common::DbTool;
use reth_provider::{providers::ProviderNodeTypes, DBProvider, DatabaseProviderFactory};
use reth_prune_types::{PruneCheckpoint, PruneMode, PruneSegment};
use reth_storage_api::{PruneCheckpointReader, PruneCheckpointWriter};
use crate::common::AccessRights;
/// `reth db prune-checkpoints` subcommand
#[derive(Debug, Parser)]
pub struct Command {
#[command(subcommand)]
command: Subcommands,
}
impl Command {
/// Returns database access rights required for the command.
pub fn access_rights(&self) -> AccessRights {
match &self.command {
Subcommands::Get { .. } => AccessRights::RO,
Subcommands::Set(_) => AccessRights::RW,
}
}
}
#[derive(Debug, Subcommand)]
enum Subcommands {
/// Get prune checkpoint(s) from database.
///
/// Shows the current prune progress for each segment, including the highest
/// pruned block/tx number and the active prune mode.
Get {
/// Specific segment to query. If omitted, shows all segments.
#[arg(long, value_enum)]
segment: Option<SegmentArg>,
},
/// Set a prune checkpoint for a segment.
///
/// WARNING: Manually setting checkpoints can cause data inconsistencies.
/// Only use this if you know what you're doing (e.g., recovering from a
/// corrupted checkpoint or forcing a re-prune from a specific block).
Set(SetArgs),
}
/// Arguments for the `set` subcommand
#[derive(Debug, Args)]
pub struct SetArgs {
/// The prune segment to update
#[arg(long, value_enum)]
segment: SegmentArg,
/// Highest pruned block number
#[arg(long)]
block_number: Option<u64>,
/// Highest pruned transaction number
#[arg(long)]
tx_number: Option<u64>,
/// Prune mode to write: full, distance, or before
#[arg(long, value_enum)]
mode: PruneModeArg,
/// Value for distance or before mode (required unless mode is full)
#[arg(long, required_if_eq_any([("mode", "distance"), ("mode", "before")]))]
mode_value: Option<u64>,
}
/// CLI-friendly prune segment names (excludes deprecated variants)
#[derive(Debug, Clone, Copy, ValueEnum)]
#[clap(rename_all = "kebab-case")]
pub enum SegmentArg {
SenderRecovery,
TransactionLookup,
Receipts,
ContractLogs,
AccountHistory,
StorageHistory,
Bodies,
}
impl From<SegmentArg> for PruneSegment {
fn from(arg: SegmentArg) -> Self {
match arg {
SegmentArg::SenderRecovery => Self::SenderRecovery,
SegmentArg::TransactionLookup => Self::TransactionLookup,
SegmentArg::Receipts => Self::Receipts,
SegmentArg::ContractLogs => Self::ContractLogs,
SegmentArg::AccountHistory => Self::AccountHistory,
SegmentArg::StorageHistory => Self::StorageHistory,
SegmentArg::Bodies => Self::Bodies,
}
}
}
/// CLI-friendly prune mode
#[derive(Debug, Clone, Copy, ValueEnum)]
#[clap(rename_all = "kebab-case")]
pub enum PruneModeArg {
/// Prune all blocks
Full,
/// Keep the last N blocks (requires --mode-value)
Distance,
/// Prune blocks before a specific block number (requires --mode-value)
Before,
}
impl Command {
/// Execute the command
pub fn execute<N: ProviderNodeTypes>(self, tool: &DbTool<N>) -> eyre::Result<()> {
match self.command {
Subcommands::Get { segment } => Self::get(tool, segment),
Subcommands::Set(args) => Self::set(tool, args),
}
}
fn get<N: ProviderNodeTypes>(
tool: &DbTool<N>,
segment: Option<SegmentArg>,
) -> eyre::Result<()> {
let provider = tool.provider_factory.provider()?;
match segment {
Some(seg) => {
let segment: PruneSegment = seg.into();
match provider.get_prune_checkpoint(segment)? {
Some(checkpoint) => print_checkpoint(segment, &checkpoint),
None => println!("No checkpoint found for {segment}"),
}
}
None => {
let mut checkpoints = provider.get_prune_checkpoints()?;
checkpoints.sort_by_key(|(seg, _)| *seg);
if checkpoints.is_empty() {
println!("No prune checkpoints found.");
} else {
println!(
"{:<25} {:>15} {:>15} {:>20}",
"Segment", "Block Number", "Tx Number", "Prune Mode"
);
println!("{}", "-".repeat(80));
for (segment, checkpoint) in &checkpoints {
println!(
"{:<25} {:>15} {:>15} {:>20}",
segment.to_string(),
fmt_opt(checkpoint.block_number),
fmt_opt(checkpoint.tx_number),
fmt_mode(&checkpoint.prune_mode),
);
}
}
}
}
Ok(())
}
fn set<N: ProviderNodeTypes>(tool: &DbTool<N>, args: SetArgs) -> eyre::Result<()> {
eyre::ensure!(
args.block_number.is_some() || args.tx_number.is_some(),
"at least one of --block-number or --tx-number must be provided"
);
let prune_mode = match args.mode {
PruneModeArg::Full => PruneMode::Full,
PruneModeArg::Distance => PruneMode::Distance(
args.mode_value
.ok_or_else(|| eyre::eyre!("--mode-value is required for distance mode"))?,
),
PruneModeArg::Before => PruneMode::Before(
args.mode_value
.ok_or_else(|| eyre::eyre!("--mode-value is required for before mode"))?,
),
};
let segment: PruneSegment = args.segment.into();
let checkpoint = PruneCheckpoint {
block_number: args.block_number,
tx_number: args.tx_number,
prune_mode,
};
let provider_rw = tool.provider_factory.database_provider_rw()?;
// Show previous value if any
if let Some(prev) = provider_rw.get_prune_checkpoint(segment)? {
println!("Previous checkpoint for {segment}:");
print_checkpoint(segment, &prev);
println!();
}
provider_rw.save_prune_checkpoint(segment, checkpoint)?;
provider_rw.commit()?;
println!("Updated checkpoint for {segment}:");
print_checkpoint(segment, &checkpoint);
Ok(())
}
}
fn print_checkpoint(segment: PruneSegment, checkpoint: &PruneCheckpoint) {
println!(" Segment: {segment}");
println!(" Block Number: {}", fmt_opt(checkpoint.block_number));
println!(" Tx Number: {}", fmt_opt(checkpoint.tx_number));
println!(" Prune Mode: {}", fmt_mode(&checkpoint.prune_mode));
}
fn fmt_opt(val: Option<u64>) -> String {
val.map_or("-".to_string(), |n| n.to_string())
}
fn fmt_mode(mode: &PruneMode) -> String {
match mode {
PruneMode::Full => "Full".to_string(),
PruneMode::Distance(d) => format!("Distance({d})"),
PruneMode::Before(b) => format!("Before({b})"),
}
}

View File

@@ -5,6 +5,7 @@ use reth_cli_util::parse_socket_address;
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
database::Database,
tables,
transaction::{DbTx, DbTxMut},
};
use reth_db_common::DbTool;
@@ -20,15 +21,13 @@ use reth_node_metrics::{
};
use reth_provider::{providers::ProviderNodeTypes, ChainSpecProvider, StageCheckpointReader};
use reth_stages::StageId;
use reth_storage_api::StorageSettingsCache;
use reth_tasks::TaskExecutor;
use reth_trie::{
verify::{Output, Verifier},
Nibbles,
};
use reth_trie_db::{
DatabaseHashedCursorFactory, DatabaseTrieCursorFactory, StorageTrieEntryLike, TrieTableAdapter,
};
use reth_trie_common::{StorageTrieEntry, StoredNibbles, StoredNibblesSubKey};
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
use std::{
net::SocketAddr,
time::{Duration, Instant},
@@ -117,13 +116,9 @@ fn verify_only<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()> {
let mut tx = db.tx()?;
tx.disable_long_read_transaction_safety();
reth_trie_db::with_adapter!(tool.provider_factory, |A| do_verify_only::<_, A>(&tx))
}
fn do_verify_only<TX: DbTx, A: TrieTableAdapter>(tx: &TX) -> eyre::Result<()> {
// Create the verifier
let hashed_cursor_factory = DatabaseHashedCursorFactory::new(tx);
let trie_cursor_factory = DatabaseTrieCursorFactory::<_, A>::new(tx);
let hashed_cursor_factory = DatabaseHashedCursorFactory::new(&tx);
let trie_cursor_factory = DatabaseTrieCursorFactory::new(&tx);
let verifier = Verifier::new(&trie_cursor_factory, hashed_cursor_factory)?;
let metrics = RepairTrieMetrics::new();
@@ -214,37 +209,17 @@ fn verify_and_repair<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()>
// Check that a pipeline sync isn't in progress.
verify_checkpoints(provider_rw.as_ref())?;
let inconsistent_nodes = reth_trie_db::with_adapter!(tool.provider_factory, |A| {
do_verify_and_repair::<_, A>(&mut provider_rw)?
});
if inconsistent_nodes == 0 {
info!("No inconsistencies found");
} else {
provider_rw.commit()?;
info!("Repaired {} inconsistencies and committed changes", inconsistent_nodes);
}
Ok(())
}
fn do_verify_and_repair<N: ProviderNodeTypes, A: TrieTableAdapter>(
provider_rw: &mut reth_provider::DatabaseProviderRW<N::DB, N>,
) -> eyre::Result<usize>
where
<N::DB as reth_db_api::database::Database>::TXMut: DbTxMut + DbTx,
{
// Create cursors for making modifications with
let tx = provider_rw.tx_mut();
tx.disable_long_read_transaction_safety();
let mut account_trie_cursor = tx.cursor_write::<A::AccountTrieTable>()?;
let mut storage_trie_cursor = tx.cursor_dup_write::<A::StorageTrieTable>()?;
let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
let mut storage_trie_cursor = tx.cursor_dup_write::<tables::StoragesTrie>()?;
// Create the cursor factories. These cannot accept the `&mut` tx above because they
// require it to be AsRef.
// Create the cursor factories. These cannot accept the `&mut` tx above because they require it
// to be AsRef.
let tx = provider_rw.tx_ref();
let hashed_cursor_factory = DatabaseHashedCursorFactory::new(tx);
let trie_cursor_factory = DatabaseTrieCursorFactory::<_, A>::new(tx);
let trie_cursor_factory = DatabaseTrieCursorFactory::new(tx);
// Create the verifier
let verifier = Verifier::new(&trie_cursor_factory, hashed_cursor_factory)?;
@@ -282,17 +257,17 @@ where
match output {
Output::AccountExtra(path, _node) => {
// Extra account node in trie, remove it
let key: A::AccountKey = path.into();
if account_trie_cursor.seek_exact(key)?.is_some() {
let nibbles = StoredNibbles(path);
if account_trie_cursor.seek_exact(nibbles)?.is_some() {
account_trie_cursor.delete_current()?;
}
}
Output::StorageExtra(account, path, _node) => {
// Extra storage node in trie, remove it
let subkey: A::StorageSubKey = path.into();
let nibbles = StoredNibblesSubKey(path);
if storage_trie_cursor
.seek_by_key_subkey(account, subkey.clone())?
.filter(|e| *e.nibbles() == subkey)
.seek_by_key_subkey(account, nibbles.clone())?
.filter(|e| e.nibbles == nibbles)
.is_some()
{
storage_trie_cursor.delete_current()?;
@@ -301,23 +276,23 @@ where
Output::AccountWrong { path, expected: node, .. } |
Output::AccountMissing(path, node) => {
// Wrong/missing account node value, upsert it
let key: A::AccountKey = path.into();
account_trie_cursor.upsert(key, &node)?;
let nibbles = StoredNibbles(path);
account_trie_cursor.upsert(nibbles, &node)?;
}
Output::StorageWrong { account, path, expected: node, .. } |
Output::StorageMissing(account, path, node) => {
// Wrong/missing storage node value, upsert it
// (We can't just use `upsert` method with a dup cursor, it's not properly
// supported)
let subkey: A::StorageSubKey = path.into();
let entry = A::StorageValue::new(subkey.clone(), node);
let nibbles = StoredNibblesSubKey(path);
if storage_trie_cursor
.seek_by_key_subkey(account, subkey.clone())?
.filter(|v| *v.nibbles() == subkey)
.seek_by_key_subkey(account, nibbles.clone())?
.filter(|v| v.nibbles == nibbles)
.is_some()
{
storage_trie_cursor.delete_current()?;
}
let entry = StorageTrieEntry { nibbles, node };
storage_trie_cursor.upsert(account, &entry)?;
}
Output::Progress(path) => {
@@ -329,7 +304,14 @@ where
}
}
Ok(inconsistent_nodes as usize)
if inconsistent_nodes == 0 {
info!("No inconsistencies found");
} else {
provider_rw.commit()?;
info!("Repaired {} inconsistencies and committed changes", inconsistent_nodes);
}
Ok(())
}
/// Output progress information based on the last seen account path.

View File

@@ -1,31 +0,0 @@
//! Static file related CLI commands
mod split;
pub use split::SplitCommand;
use clap::{Parser, Subcommand};
use reth_db_common::DbTool;
use reth_provider::providers::ProviderNodeTypes;
/// Static files subcommands
#[derive(Debug, Parser)]
pub struct Command {
#[command(subcommand)]
command: Subcommands,
}
#[derive(Debug, Subcommand)]
enum Subcommands {
/// Split static files into new files with different blocks-per-file setting
Split(SplitCommand),
}
impl Command {
/// Execute the static files command
pub fn execute<N: ProviderNodeTypes>(self, tool: &DbTool<N>) -> eyre::Result<()> {
match self.command {
Subcommands::Split(cmd) => cmd.execute(tool),
}
}
}

View File

@@ -1,703 +0,0 @@
use clap::Parser;
use reth_codecs::Compact;
use reth_db::{
cursor::DbCursorRO,
static_file::{
AccountChangesetMask, BlockHashMask, HeaderMask, ReceiptMask, StorageChangesetMask,
TotalDifficultyMask, TransactionMask, TransactionSenderMask,
},
tables,
transaction::DbTx,
};
use reth_db_api::models::CompactU256;
use reth_db_common::DbTool;
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::{ProviderNodeTypes, StaticFileProvider},
StaticFileProviderBuilder, StaticFileProviderFactory, StaticFileWriter,
};
use reth_static_file_types::StaticFileSegment;
use std::path::PathBuf;
use tracing::info;
/// Split static files into new files with different blocks-per-file setting
#[derive(Debug, Parser)]
pub struct SplitCommand {
/// Source static files directory.
/// If not specified, uses the datadir's static_files directory.
#[arg(long, value_name = "PATH")]
static_files_dir: Option<PathBuf>,
/// Output directory for the new static files.
/// Required unless --in-place is specified.
#[arg(long, value_name = "PATH", required_unless_present = "in_place")]
output_dir: Option<PathBuf>,
/// Number of blocks per output file
#[arg(long, value_name = "NUM")]
blocks_per_file: u64,
/// Segments to split (default: all)
#[arg(long, value_delimiter = ',')]
segments: Option<Vec<StaticFileSegment>>,
/// Start block number (default: 0)
#[arg(long)]
from_block: Option<u64>,
/// End block number (default: highest available)
#[arg(long)]
to_block: Option<u64>,
/// Print what would be done without writing
#[arg(long)]
dry_run: bool,
/// Split in-place: write to temp dir, verify, then atomically swap.
/// Original files are preserved in static_files.bak
#[arg(long, conflicts_with = "output_dir")]
in_place: bool,
/// Skip verification step when using --in-place
#[arg(long, requires = "in_place")]
skip_verify: bool,
}
impl SplitCommand {
/// Execute the split command
pub fn execute<N: ProviderNodeTypes>(self, tool: &DbTool<N>) -> eyre::Result<()>
where
N::Primitives: NodePrimitives<BlockHeader: Compact, SignedTx: Compact, Receipt: Compact>,
{
let segments = self.segments.clone().unwrap_or_else(|| StaticFileSegment::iter().collect());
// Use custom static files dir if provided, otherwise use datadir's static files
let (source_provider, source_dir) =
if let Some(ref static_files_dir) = self.static_files_dir {
let provider = StaticFileProviderBuilder::read_only(static_files_dir)
.build::<N::Primitives>()?;
let dir = static_files_dir.clone();
(provider, dir)
} else {
let provider = tool.provider_factory.static_file_provider();
let dir = provider.directory().to_path_buf();
(provider, dir)
};
// Determine output directory
let (output_dir, is_in_place) = if self.in_place {
let temp_dir = source_dir.with_file_name("static_files.tmp");
(temp_dir, true)
} else {
(self.output_dir.clone().expect("output_dir required when not in_place"), false)
};
info!(
target: "reth::cli",
output_dir = %output_dir.display(),
blocks_per_file = self.blocks_per_file,
?segments,
from_block = ?self.from_block,
to_block = ?self.to_block,
dry_run = self.dry_run,
in_place = is_in_place,
"Splitting static files"
);
if self.dry_run {
println!("Dry run mode - no files will be written");
if is_in_place {
println!("In-place mode:");
println!(" 1. Write to: {}", output_dir.display());
println!(" 2. Verify output integrity");
println!(" 3. Rename {} -> {}.bak", source_dir.display(), source_dir.display());
println!(" 4. Rename {} -> {}", output_dir.display(), source_dir.display());
}
for segment in &segments {
let min_block = source_provider.get_lowest_range_start(*segment);
let max_block = source_provider.get_highest_static_file_block(*segment);
if let (Some(min_block), Some(max_block)) = (min_block, max_block) {
let from_block = self.from_block.unwrap_or(min_block).max(min_block);
let to_block = self.to_block.unwrap_or(max_block).min(max_block);
let num_blocks = to_block.saturating_sub(from_block) + 1;
let num_files = num_blocks.div_ceil(self.blocks_per_file);
println!(
" {segment}: blocks {from_block}..={to_block} ({num_blocks} blocks) -> {num_files} files"
);
} else {
println!(" {segment}: no data available");
}
}
return Ok(());
}
// Clean up output directory if it exists
// For in-place mode: remove previous incomplete temp directory
// For regular mode: ensure we start fresh to avoid block number mismatches
if output_dir.exists() {
info!(target: "reth::cli", output_dir = %output_dir.display(), "Removing existing output directory");
reth_fs_util::remove_dir_all(&output_dir)?;
}
reth_fs_util::create_dir_all(&output_dir)?;
// Calculate segment ranges first to determine the global starting block
let mut segment_ranges = Vec::new();
for &segment in &segments {
let Some(min_block) = source_provider.get_lowest_range_start(segment) else {
continue;
};
let Some(max_block) = source_provider.get_highest_static_file_block(segment) else {
continue;
};
let from_block = self.from_block.unwrap_or(min_block).max(min_block);
let to_block = self.to_block.unwrap_or(max_block).min(max_block);
if from_block <= to_block {
segment_ranges.push((segment, from_block, to_block));
}
}
for (segment, from_block, to_block) in segment_ranges {
info!(target: "reth::cli", ?segment, from_block, to_block, "Processing segment");
// Build output provider per-segment with genesis_block_number set to this segment's
// starting block. This prevents the writer from trying to load non-existent previous
// files when segments have different starting blocks (e.g., pruned transactions).
let output_provider = StaticFileProviderBuilder::read_write(&output_dir)
.with_blocks_per_file(self.blocks_per_file)
.with_genesis_block_number(from_block)
.build::<N::Primitives>()?;
match segment {
StaticFileSegment::Headers => {
self.split_headers::<N>(
&source_provider,
&output_provider,
from_block,
to_block,
)?;
}
StaticFileSegment::Transactions => {
self.split_transactions::<N>(
tool,
&source_provider,
&output_provider,
from_block,
to_block,
)?;
}
StaticFileSegment::Receipts => {
self.split_receipts::<N>(
tool,
&source_provider,
&output_provider,
from_block,
to_block,
)?;
}
StaticFileSegment::TransactionSenders => {
self.split_transaction_senders::<N>(
tool,
&source_provider,
&output_provider,
from_block,
to_block,
)?;
}
StaticFileSegment::AccountChangeSets => {
self.split_account_changesets::<N>(
&source_provider,
&output_provider,
from_block,
to_block,
)?;
}
StaticFileSegment::StorageChangeSets => {
self.split_storage_changesets::<N>(
&source_provider,
&output_provider,
from_block,
to_block,
)?;
}
}
info!(target: "reth::cli", ?segment, "Segment complete");
// Drop the output provider to release file handles before processing next segment
drop(output_provider);
}
// In-place mode: verify and swap directories
if is_in_place {
// Verification step
if !self.skip_verify {
info!(target: "reth::cli", "Verifying output integrity");
self.verify_output::<N>(&output_dir, &segments)?;
}
// Atomic swap
let backup_dir = source_dir.with_file_name("static_files.bak");
// Remove old backup if exists
if backup_dir.exists() {
info!(target: "reth::cli", backup_dir = %backup_dir.display(), "Removing old backup");
reth_fs_util::remove_dir_all(&backup_dir)?;
}
// Drop source provider to release file handles
drop(source_provider);
// Rename: source -> backup
info!(target: "reth::cli",
from = %source_dir.display(),
to = %backup_dir.display(),
"Moving original to backup"
);
reth_fs_util::rename(&source_dir, &backup_dir)?;
// Rename: temp -> source
info!(target: "reth::cli",
from = %output_dir.display(),
to = %source_dir.display(),
"Moving new files into place"
);
reth_fs_util::rename(&output_dir, &source_dir)?;
info!(target: "reth::cli",
backup = %backup_dir.display(),
"In-place split complete. Original files preserved in backup directory"
);
}
info!(target: "reth::cli", "Static file split complete");
Ok(())
}
/// Verify the output static files have valid data
fn verify_output<N: ProviderNodeTypes>(
&self,
output_dir: &PathBuf,
segments: &[StaticFileSegment],
) -> eyre::Result<()> {
let provider = StaticFileProviderBuilder::read_only(output_dir).build::<N::Primitives>()?;
for &segment in segments {
let Some(lowest) = provider.get_lowest_range_start(segment) else {
return Err(eyre::eyre!("Verification failed: no data for segment {segment}"));
};
let Some(highest) = provider.get_highest_static_file_block(segment) else {
return Err(eyre::eyre!("Verification failed: no data for segment {segment}"));
};
// Verify we can read the first and last blocks
provider.get_segment_provider(segment, lowest)?;
provider.get_segment_provider(segment, highest)?;
info!(target: "reth::cli", ?segment, from_block = lowest, to_block = highest, "Verified");
}
Ok(())
}
fn split_headers<N: ProviderNodeTypes>(
&self,
source: &StaticFileProvider<N::Primitives>,
output: &StaticFileProvider<N::Primitives>,
from_block: u64,
to_block: u64,
) -> eyre::Result<()>
where
<N::Primitives as NodePrimitives>::BlockHeader: Compact,
{
let mut writer = output.get_writer(from_block, StaticFileSegment::Headers)?;
for block in from_block..=to_block {
let jar = source.get_segment_provider(StaticFileSegment::Headers, block)?;
let mut cursor = jar.cursor()?;
let header: <N::Primitives as NodePrimitives>::BlockHeader = cursor
.get_one::<HeaderMask<_>>(block.into())?
.ok_or_else(|| eyre::eyre!("Missing header for block {block}"))?;
let td: CompactU256 = cursor
.get_one::<TotalDifficultyMask>(block.into())?
.ok_or_else(|| eyre::eyre!("Missing TD for block {block}"))?;
let hash = cursor
.get_one::<BlockHashMask>(block.into())?
.ok_or_else(|| eyre::eyre!("Missing hash for block {block}"))?;
writer.append_header_with_td(&header, td.into(), &hash)?;
if block % 100_000 == 0 {
info!(target: "reth::cli", block, to_block, "Headers progress");
}
}
writer.commit()?;
Ok(())
}
fn split_transactions<N: ProviderNodeTypes>(
&self,
tool: &DbTool<N>,
source: &StaticFileProvider<N::Primitives>,
output: &StaticFileProvider<N::Primitives>,
from_block: u64,
to_block: u64,
) -> eyre::Result<()>
where
<N::Primitives as NodePrimitives>::SignedTx: Compact,
{
let tx = tool.provider_factory.provider()?.into_tx();
let mut indices_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
let mut writer = output.get_writer(from_block, StaticFileSegment::Transactions)?;
for block in from_block..=to_block {
writer.increment_block(block)?;
if let Some(indices) = indices_cursor.seek_exact(block)?.map(|(_, v)| v) {
let first_tx = indices.first_tx_num;
let tx_count = indices.tx_count;
for tx_num in first_tx..first_tx + tx_count {
let jar =
source.get_segment_provider(StaticFileSegment::Transactions, tx_num)?;
let mut cursor = jar.cursor()?;
let transaction: <N::Primitives as NodePrimitives>::SignedTx = cursor
.get_one::<TransactionMask<_>>(tx_num.into())?
.ok_or_else(|| eyre::eyre!("Missing transaction {tx_num}"))?;
writer.append_transaction(tx_num, &transaction)?;
}
}
if block % 100_000 == 0 {
info!(target: "reth::cli", block, to_block, "Transactions progress");
}
}
writer.commit()?;
Ok(())
}
fn split_receipts<N: ProviderNodeTypes>(
&self,
tool: &DbTool<N>,
source: &StaticFileProvider<N::Primitives>,
output: &StaticFileProvider<N::Primitives>,
from_block: u64,
to_block: u64,
) -> eyre::Result<()>
where
<N::Primitives as NodePrimitives>::Receipt: Compact,
{
let tx = tool.provider_factory.provider()?.into_tx();
let mut indices_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
let mut writer = output.get_writer(from_block, StaticFileSegment::Receipts)?;
for block in from_block..=to_block {
writer.increment_block(block)?;
if let Some(indices) = indices_cursor.seek_exact(block)?.map(|(_, v)| v) {
let first_tx = indices.first_tx_num;
let tx_count = indices.tx_count;
for tx_num in first_tx..first_tx + tx_count {
let jar = source.get_segment_provider(StaticFileSegment::Receipts, tx_num)?;
let mut cursor = jar.cursor()?;
let receipt: <N::Primitives as NodePrimitives>::Receipt = cursor
.get_one::<ReceiptMask<_>>(tx_num.into())?
.ok_or_else(|| eyre::eyre!("Missing receipt {tx_num}"))?;
writer.append_receipt(tx_num, &receipt)?;
}
}
if block % 100_000 == 0 {
info!(target: "reth::cli", block, to_block, "Receipts progress");
}
}
writer.commit()?;
Ok(())
}
fn split_transaction_senders<N: ProviderNodeTypes>(
&self,
tool: &DbTool<N>,
source: &StaticFileProvider<N::Primitives>,
output: &StaticFileProvider<N::Primitives>,
from_block: u64,
to_block: u64,
) -> eyre::Result<()> {
let tx = tool.provider_factory.provider()?.into_tx();
let mut indices_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
let mut writer = output.get_writer(from_block, StaticFileSegment::TransactionSenders)?;
for block in from_block..=to_block {
writer.increment_block(block)?;
if let Some(indices) = indices_cursor.seek_exact(block)?.map(|(_, v)| v) {
let first_tx = indices.first_tx_num;
let tx_count = indices.tx_count;
for tx_num in first_tx..first_tx + tx_count {
let jar = source
.get_segment_provider(StaticFileSegment::TransactionSenders, tx_num)?;
let mut cursor = jar.cursor()?;
let sender = cursor
.get_one::<TransactionSenderMask>(tx_num.into())?
.ok_or_else(|| eyre::eyre!("Missing sender {tx_num}"))?;
writer.append_transaction_sender(tx_num, &sender)?;
}
}
if block % 100_000 == 0 {
info!(target: "reth::cli", block, to_block, "Transaction senders progress");
}
}
writer.commit()?;
Ok(())
}
fn split_account_changesets<N: ProviderNodeTypes>(
&self,
source: &StaticFileProvider<N::Primitives>,
output: &StaticFileProvider<N::Primitives>,
from_block: u64,
to_block: u64,
) -> eyre::Result<()> {
let mut writer = output.get_writer(from_block, StaticFileSegment::AccountChangeSets)?;
for block in from_block..=to_block {
let jar = source.get_segment_provider(StaticFileSegment::AccountChangeSets, block)?;
let mut changes = Vec::new();
if let Some(offset) = jar.read_changeset_offset(block)? {
let mut cursor = jar.cursor()?;
for i in offset.changeset_range() {
if let Some(change) = cursor.get_one::<AccountChangesetMask>(i.into())? {
changes.push(change);
}
}
}
writer.append_account_changeset(changes, block)?;
if block % 100_000 == 0 {
info!(target: "reth::cli", block, to_block, "Account changesets progress");
}
}
writer.commit()?;
Ok(())
}
fn split_storage_changesets<N: ProviderNodeTypes>(
&self,
source: &StaticFileProvider<N::Primitives>,
output: &StaticFileProvider<N::Primitives>,
from_block: u64,
to_block: u64,
) -> eyre::Result<()> {
let mut writer = output.get_writer(from_block, StaticFileSegment::StorageChangeSets)?;
for block in from_block..=to_block {
let jar = source.get_segment_provider(StaticFileSegment::StorageChangeSets, block)?;
let mut changes = Vec::new();
if let Some(offset) = jar.read_changeset_offset(block)? {
let mut cursor = jar.cursor()?;
for i in offset.changeset_range() {
if let Some(change) = cursor.get_one::<StorageChangesetMask>(i.into())? {
changes.push(change);
}
}
}
writer.append_storage_changeset(changes, block)?;
if block % 100_000 == 0 {
info!(target: "reth::cli", block, to_block, "Storage changesets progress");
}
}
writer.commit()?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use clap::Parser;
#[derive(Parser)]
struct TestCli {
#[command(subcommand)]
command: TestCommand,
}
#[derive(clap::Subcommand)]
enum TestCommand {
Split(SplitCommand),
}
#[test]
fn parse_split_command_minimal() {
let args = TestCli::try_parse_from([
"test",
"split",
"--output-dir",
"/tmp/output",
"--blocks-per-file",
"100000",
])
.unwrap();
match args.command {
TestCommand::Split(cmd) => {
assert_eq!(cmd.output_dir, Some(PathBuf::from("/tmp/output")));
assert_eq!(cmd.blocks_per_file, 100000);
assert!(cmd.segments.is_none());
assert!(cmd.from_block.is_none());
assert!(cmd.to_block.is_none());
assert!(!cmd.dry_run);
assert!(!cmd.in_place);
}
}
}
#[test]
fn parse_split_command_full() {
let args = TestCli::try_parse_from([
"test",
"split",
"--output-dir",
"/tmp/output",
"--blocks-per-file",
"50000",
"--segments",
"headers,receipts",
"--from-block",
"1000",
"--to-block",
"500000",
"--dry-run",
])
.unwrap();
match args.command {
TestCommand::Split(cmd) => {
assert_eq!(cmd.output_dir, Some(PathBuf::from("/tmp/output")));
assert_eq!(cmd.blocks_per_file, 50000);
assert_eq!(
cmd.segments,
Some(vec![StaticFileSegment::Headers, StaticFileSegment::Receipts])
);
assert_eq!(cmd.from_block, Some(1000));
assert_eq!(cmd.to_block, Some(500000));
assert!(cmd.dry_run);
assert!(!cmd.in_place);
}
}
}
#[test]
fn parse_split_command_in_place() {
let args =
TestCli::try_parse_from(["test", "split", "--in-place", "--blocks-per-file", "100000"])
.unwrap();
match args.command {
TestCommand::Split(cmd) => {
assert!(cmd.output_dir.is_none());
assert_eq!(cmd.blocks_per_file, 100000);
assert!(cmd.in_place);
assert!(!cmd.skip_verify);
}
}
}
#[test]
fn parse_split_command_in_place_skip_verify() {
let args = TestCli::try_parse_from([
"test",
"split",
"--in-place",
"--skip-verify",
"--blocks-per-file",
"100000",
])
.unwrap();
match args.command {
TestCommand::Split(cmd) => {
assert!(cmd.in_place);
assert!(cmd.skip_verify);
}
}
}
#[test]
fn parse_split_command_output_dir_conflicts_with_in_place() {
let result = TestCli::try_parse_from([
"test",
"split",
"--output-dir",
"/tmp/out",
"--in-place",
"--blocks-per-file",
"100000",
]);
assert!(result.is_err());
}
#[test]
fn parse_split_command_skip_verify_requires_in_place() {
// --skip-verify without --in-place should fail
let result = TestCli::try_parse_from([
"test",
"split",
"--skip-verify",
"--blocks-per-file",
"100000",
]);
assert!(result.is_err(), "--skip-verify should require --in-place");
}
#[test]
fn parse_split_command_all_segments() {
let args = TestCli::try_parse_from([
"test",
"split",
"--output-dir",
"/tmp/out",
"--blocks-per-file",
"10",
"--segments",
"headers,transactions,receipts,transaction-senders,account-change-sets,storage-change-sets",
])
.unwrap();
match args.command {
TestCommand::Split(cmd) => {
let segments = cmd.segments.unwrap();
assert_eq!(segments.len(), 6);
assert!(segments.contains(&StaticFileSegment::Headers));
assert!(segments.contains(&StaticFileSegment::Transactions));
assert!(segments.contains(&StaticFileSegment::Receipts));
assert!(segments.contains(&StaticFileSegment::TransactionSenders));
assert!(segments.contains(&StaticFileSegment::AccountChangeSets));
assert!(segments.contains(&StaticFileSegment::StorageChangeSets));
}
}
}
}

View File

@@ -384,19 +384,15 @@ fn resumable_download(url: &str, target_dir: &Path) -> Result<(PathBuf, u64)> {
let mut total_size: Option<u64> = None;
let mut last_error: Option<eyre::Error> = None;
let finalize_download = |size: u64| -> Result<(PathBuf, u64)> {
fs::rename(&part_path, &final_path)?;
info!(target: "reth::cli", "Download complete: {}", final_path.display());
Ok((final_path.clone(), size))
};
for attempt in 1..=MAX_DOWNLOAD_RETRIES {
let existing_size = fs::metadata(&part_path).map(|m| m.len()).unwrap_or(0);
if let Some(total) = total_size &&
existing_size >= total
{
return finalize_download(total);
fs::rename(&part_path, &final_path)?;
info!(target: "reth::cli", "Download complete: {}", final_path.display());
return Ok((final_path, total));
}
if attempt > 1 {
@@ -480,7 +476,9 @@ fn resumable_download(url: &str, target_dir: &Path) -> Result<(PathBuf, u64)> {
continue;
}
return finalize_download(current_total);
fs::rename(&part_path, &final_path)?;
info!(target: "reth::cli", "Download complete: {}", final_path.display());
return Ok((final_path, current_total));
}
Err(last_error

View File

@@ -44,11 +44,11 @@ pub struct ExportArgs {
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ExportEraCommand<C> {
/// Execute `export-era` command
pub async fn execute<N>(self, runtime: reth_tasks::Runtime) -> eyre::Result<()>
pub async fn execute<N>(self) -> eyre::Result<()>
where
N: CliNodeTypes<ChainSpec = C::ChainSpec>,
{
let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RO, runtime)?;
let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RO)?;
// Either specified path or default to `<data-dir>/<chain>/era1-export/`
let data_dir = match &self.export.path {

View File

@@ -47,7 +47,6 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportComm
pub async fn execute<N, Comp>(
self,
components: impl FnOnce(Arc<N::ChainSpec>) -> Comp,
runtime: reth_tasks::Runtime,
) -> eyre::Result<()>
where
N: CliNodeTypes<ChainSpec = C::ChainSpec>,
@@ -55,8 +54,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportComm
{
info!(target: "reth::cli", "reth {} starting", version_metadata().short_version);
let Environment { provider_factory, config, .. } =
self.env.init::<N>(AccessRights::RW, runtime.clone())?;
let Environment { provider_factory, config, .. } = self.env.init::<N>(AccessRights::RW)?;
let components = components(provider_factory.chain_spec());
@@ -87,7 +85,6 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportComm
&config,
executor.clone(),
consensus.clone(),
runtime.clone(),
)
.await?;

View File

@@ -87,7 +87,6 @@ pub async fn import_blocks_from_file<N>(
config: &Config,
executor: impl ConfigureEvm<Primitives = N::Primitives> + 'static,
consensus: Arc<impl FullConsensus<N::Primitives> + 'static>,
runtime: reth_tasks::Runtime,
) -> eyre::Result<ImportResult>
where
N: ProviderNodeTypes,
@@ -148,7 +147,6 @@ where
static_file_producer.clone(),
import_config.no_state,
executor.clone(),
runtime.clone(),
)?;
// override the tip
@@ -259,7 +257,6 @@ where
///
/// If configured to execute, all stages will run. Otherwise, only stages that don't require state
/// will run.
#[expect(clippy::too_many_arguments)]
pub fn build_import_pipeline_impl<N, C, E>(
config: &Config,
provider_factory: ProviderFactory<N>,
@@ -268,7 +265,6 @@ pub fn build_import_pipeline_impl<N, C, E>(
static_file_producer: StaticFileProducer<ProviderFactory<N>>,
disable_exec: bool,
evm_config: E,
runtime: reth_tasks::Runtime,
) -> eyre::Result<(Pipeline<N>, impl futures::Stream<Item = NodeEvent<N::Primitives>> + use<N, C, E>)>
where
N: ProviderNodeTypes,
@@ -287,7 +283,7 @@ where
let mut header_downloader = ReverseHeadersDownloaderBuilder::new(config.stages.headers)
.build(file_client.clone(), consensus.clone())
.into_task_with(&runtime);
.into_task();
// TODO: The pipeline should correctly configure the downloader on its own.
// Find the possibility to remove unnecessary pre-configuration.
header_downloader.update_local_head(local_head);
@@ -295,7 +291,7 @@ where
let mut body_downloader = BodiesDownloaderBuilder::new(config.stages.bodies)
.build(file_client.clone(), consensus.clone(), provider_factory.clone())
.into_task_with(&runtime);
.into_task();
// TODO: The pipeline should correctly configure the downloader on its own.
// Find the possibility to remove unnecessary pre-configuration.
body_downloader

View File

@@ -64,14 +64,13 @@ impl TryFromChain for ChainKind {
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportEraCommand<C> {
/// Execute `import-era` command
pub async fn execute<N>(self, runtime: reth_tasks::Runtime) -> eyre::Result<()>
pub async fn execute<N>(self) -> eyre::Result<()>
where
N: CliNodeTypes<ChainSpec = C::ChainSpec>,
{
info!(target: "reth::cli", "reth {} starting", version_metadata().short_version);
let Environment { provider_factory, config, .. } =
self.env.init::<N>(AccessRights::RW, runtime)?;
let Environment { provider_factory, config, .. } = self.env.init::<N>(AccessRights::RW)?;
let mut hash_collector = Collector::new(config.stages.etl.file_size, config.stages.etl.dir);

View File

@@ -18,13 +18,10 @@ pub struct InitCommand<C: ChainSpecParser> {
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> InitCommand<C> {
/// Execute the `init` command
pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(
self,
runtime: reth_tasks::Runtime,
) -> eyre::Result<()> {
pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(self) -> eyre::Result<()> {
info!(target: "reth::cli", "reth init starting");
let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RW, runtime)?;
let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RW)?;
let genesis_block_number = provider_factory.chain_spec().genesis_header().number();
let hash = provider_factory

View File

@@ -65,7 +65,7 @@ pub struct InitStateCommand<C: ChainSpecParser> {
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> InitStateCommand<C> {
/// Execute the `init` command
pub async fn execute<N>(self, runtime: reth_tasks::Runtime) -> eyre::Result<()>
pub async fn execute<N>(self) -> eyre::Result<()>
where
N: CliNodeTypes<
ChainSpec = C::ChainSpec,
@@ -74,8 +74,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> InitStateC
{
info!(target: "reth::cli", "Reth init-state starting");
let Environment { config, provider_factory, .. } =
self.env.init::<N>(AccessRights::RW, runtime)?;
let Environment { config, provider_factory, .. } = self.env.init::<N>(AccessRights::RW)?;
let static_file_provider = provider_factory.static_file_provider();
let provider_rw = provider_factory.database_provider_rw()?;

View File

@@ -16,7 +16,6 @@ use reth_node_core::{
args::{DatadirArgs, NetworkArgs},
utils::get_single_header,
};
use reth_tasks::Runtime;
pub mod bootnode;
pub mod enode;
@@ -195,18 +194,17 @@ impl<C: ChainSpecParser> DownloadArgs<C> {
let rlpx_socket = (self.network.addr, self.network.port).into();
let boot_nodes = self.chain.bootnodes().unwrap_or_default();
let net =
NetworkConfigBuilder::<N::NetworkPrimitives>::new(p2p_secret_key, Runtime::test())
.peer_config(config.peers_config_with_basic_nodes_from_file(None))
.external_ip_resolver(self.network.nat.clone())
.network_id(self.network.network_id)
.boot_nodes(boot_nodes.clone())
.apply(|builder| {
self.network.discovery.apply_to_builder(builder, rlpx_socket, boot_nodes)
})
.build_with_noop_provider(self.chain.clone())
.manager()
.await?;
let net = NetworkConfigBuilder::<N::NetworkPrimitives>::new(p2p_secret_key)
.peer_config(config.peers_config_with_basic_nodes_from_file(None))
.external_ip_resolver(self.network.nat.clone())
.network_id(self.network.network_id)
.boot_nodes(boot_nodes.clone())
.apply(|builder| {
self.network.discovery.apply_to_builder(builder, rlpx_socket, boot_nodes)
})
.build_with_noop_provider(self.chain.clone())
.manager()
.await?;
let handle = net.handle().clone();
tokio::task::spawn(net);

View File

@@ -36,7 +36,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> PruneComma
self,
ctx: CliContext,
) -> eyre::Result<()> {
let env = self.env.init::<N>(AccessRights::RW, ctx.task_executor.clone())?;
let env = self.env.init::<N>(AccessRights::RW)?;
let provider_factory = env.provider_factory;
let config = env.config.prune;
let data_dir = env.data_dir;

View File

@@ -20,10 +20,7 @@ use reth_provider::{
use reth_revm::database::StateProviderDatabase;
use reth_stages::stages::calculate_gas_used_from_headers;
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
sync::Arc,
time::{Duration, Instant},
};
use tokio::{sync::mpsc, task::JoinSet};
@@ -49,10 +46,6 @@ pub struct Command<C: ChainSpecParser> {
#[arg(long)]
num_tasks: Option<u64>,
/// Number of blocks each worker processes before grabbing the next chunk.
#[arg(long, default_value = "5000")]
blocks_per_chunk: u64,
/// Continues with execution when an invalid block is encountered and collects these blocks.
#[arg(long)]
skip_invalid_blocks: bool,
@@ -67,15 +60,11 @@ impl<C: ChainSpecParser> Command<C> {
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>> Command<C> {
/// Execute `re-execute` command
pub async fn execute<N>(
self,
components: impl CliComponentsBuilder<N>,
runtime: reth_tasks::Runtime,
) -> eyre::Result<()>
pub async fn execute<N>(self, components: impl CliComponentsBuilder<N>) -> eyre::Result<()>
where
N: CliNodeTypes<ChainSpec = C::ChainSpec>,
{
let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RO, runtime)?;
let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RO)?;
let components = components(provider_factory.chain_spec());
@@ -99,10 +88,12 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
std::thread::available_parallelism().map(|n| n.get() as u64).unwrap_or(10)
});
let total_blocks = max_block - min_block;
let total_gas = calculate_gas_used_from_headers(
&provider_factory.static_file_provider(),
min_block..=max_block,
)?;
let blocks_per_task = total_blocks / num_tasks;
let db_at = {
let provider_factory = provider_factory.clone();
@@ -114,17 +105,18 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
};
let skip_invalid_blocks = self.skip_invalid_blocks;
let blocks_per_chunk = self.blocks_per_chunk;
let (stats_tx, mut stats_rx) = mpsc::unbounded_channel();
let (info_tx, mut info_rx) = mpsc::unbounded_channel();
let cancellation = CancellationToken::new();
let _guard = cancellation.drop_guard();
// Shared counter for work stealing: workers atomically grab the next chunk of blocks.
let next_block = Arc::new(AtomicU64::new(min_block));
let mut tasks = JoinSet::new();
for _ in 0..num_tasks {
for i in 0..num_tasks {
let start_block = min_block + i * blocks_per_task;
let end_block =
if i == num_tasks - 1 { max_block } else { start_block + blocks_per_task };
// Spawn thread executing blocks
let provider_factory = provider_factory.clone();
let evm_config = components.evm_config().clone();
let consensus = components.consensus().clone();
@@ -132,122 +124,95 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
let stats_tx = stats_tx.clone();
let info_tx = info_tx.clone();
let cancellation = cancellation.clone();
let next_block = Arc::clone(&next_block);
tasks.spawn_blocking(move || {
let mut executor = evm_config.batch_executor(db_at(start_block - 1));
let mut executor_created = Instant::now();
let executor_lifetime = Duration::from_secs(120);
loop {
'blocks: for block in start_block..end_block {
if cancellation.is_cancelled() {
break;
// exit if the program is being terminated
break
}
// Atomically grab the next chunk of blocks.
let chunk_start =
next_block.fetch_add(blocks_per_chunk, Ordering::Relaxed);
if chunk_start >= max_block {
break;
}
let chunk_end = (chunk_start + blocks_per_chunk).min(max_block);
let block = provider_factory
.recovered_block(block.into(), TransactionVariant::NoHash)?
.unwrap();
let mut executor = evm_config.batch_executor(db_at(chunk_start - 1));
let mut executor_created = Instant::now();
'blocks: for block in chunk_start..chunk_end {
if cancellation.is_cancelled() {
break;
}
let block = provider_factory
.recovered_block(block.into(), TransactionVariant::NoHash)?
.unwrap();
let result = match executor.execute_one(&block) {
Ok(result) => result,
Err(err) => {
if skip_invalid_blocks {
executor =
evm_config.batch_executor(db_at(block.number()));
let _ =
info_tx.send((block, eyre::Report::new(err)));
continue
}
return Err(err.into())
let result = match executor.execute_one(&block) {
Ok(result) => result,
Err(err) => {
if skip_invalid_blocks {
executor = evm_config.batch_executor(db_at(block.number()));
let _ = info_tx.send((block, eyre::Report::new(err)));
continue
}
};
return Err(err.into())
}
};
if let Err(err) = consensus
.validate_block_post_execution(&block, &result, None)
.wrap_err_with(|| {
format!(
"Failed to validate block {} {}",
block.number(),
block.hash()
)
})
if let Err(err) = consensus
.validate_block_post_execution(&block, &result, None)
.wrap_err_with(|| {
format!("Failed to validate block {} {}", block.number(), block.hash())
})
{
let correct_receipts =
provider_factory.receipts_by_block(block.number().into())?.unwrap();
for (i, (receipt, correct_receipt)) in
result.receipts.iter().zip(correct_receipts.iter()).enumerate()
{
let correct_receipts = provider_factory
.receipts_by_block(block.number().into())?
.unwrap();
if receipt != correct_receipt {
let tx_hash = block.body().transactions()[i].tx_hash();
error!(
?receipt,
?correct_receipt,
index = i,
?tx_hash,
"Invalid receipt"
);
let expected_gas_used = correct_receipt.cumulative_gas_used() -
if i == 0 {
0
} else {
correct_receipts[i - 1].cumulative_gas_used()
};
let got_gas_used = receipt.cumulative_gas_used() -
if i == 0 {
0
} else {
result.receipts[i - 1].cumulative_gas_used()
};
if got_gas_used != expected_gas_used {
let mismatch = GotExpected {
expected: expected_gas_used,
got: got_gas_used,
};
for (i, (receipt, correct_receipt)) in
result.receipts.iter().zip(correct_receipts.iter()).enumerate()
{
if receipt != correct_receipt {
let tx_hash =
block.body().transactions()[i].tx_hash();
error!(
?receipt,
?correct_receipt,
index = i,
?tx_hash,
"Invalid receipt"
);
let expected_gas_used =
correct_receipt.cumulative_gas_used() -
if i == 0 {
0
} else {
correct_receipts[i - 1]
.cumulative_gas_used()
};
let got_gas_used = receipt.cumulative_gas_used() -
if i == 0 {
0
} else {
result.receipts[i - 1].cumulative_gas_used()
};
if got_gas_used != expected_gas_used {
let mismatch = GotExpected {
expected: expected_gas_used,
got: got_gas_used,
};
error!(number=?block.number(), ?mismatch, "Gas usage mismatch");
if skip_invalid_blocks {
executor = evm_config
.batch_executor(db_at(block.number()));
let _ = info_tx.send((block, err));
continue 'blocks;
}
return Err(err);
error!(number=?block.number(), ?mismatch, "Gas usage mismatch");
if skip_invalid_blocks {
executor = evm_config.batch_executor(db_at(block.number()));
let _ = info_tx.send((block, err));
continue 'blocks;
}
} else {
continue;
return Err(err);
}
} else {
continue;
}
return Err(err);
}
let _ = stats_tx.send(block.gas_used());
// Reset DB once in a while to avoid OOM or read tx timeouts
if executor.size_hint() > 1_000_000 ||
executor_created.elapsed() > executor_lifetime
{
executor =
evm_config.batch_executor(db_at(block.number()));
executor_created = Instant::now();
}
return Err(err);
}
let _ = stats_tx.send(block.gas_used());
// Reset DB once in a while to avoid OOM or read tx timeouts
if executor.size_hint() > 1_000_000 ||
executor_created.elapsed() > executor_lifetime
{
executor = evm_config.batch_executor(db_at(block.number()));
executor_created = Instant::now();
}
}

View File

@@ -37,11 +37,11 @@ pub struct Command<C: ChainSpecParser> {
impl<C: ChainSpecParser> Command<C> {
/// Execute `db` command
pub async fn execute<N: CliNodeTypes>(self, runtime: reth_tasks::Runtime) -> eyre::Result<()>
pub async fn execute<N: CliNodeTypes>(self) -> eyre::Result<()>
where
C: ChainSpecParser<ChainSpec = N::ChainSpec>,
{
let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RW, runtime)?;
let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RW)?;
let tool = DbTool::new(provider_factory)?;

View File

@@ -16,7 +16,6 @@ use reth_stages::{stages::ExecutionStage, Stage, StageCheckpoint, UnwindInput};
use std::sync::Arc;
use tracing::info;
#[expect(clippy::too_many_arguments)]
pub(crate) async fn dump_execution_stage<N, E, C>(
db_tool: &DbTool<N>,
from: u64,
@@ -25,7 +24,6 @@ pub(crate) async fn dump_execution_stage<N, E, C>(
should_run: bool,
evm_config: E,
consensus: C,
runtime: reth_tasks::Runtime,
) -> eyre::Result<()>
where
N: ProviderNodeTypes<DB = DatabaseEnv>,
@@ -39,6 +37,7 @@ where
unwind_and_copy(db_tool, from, tip_block_number, &output_db, evm_config.clone())?;
if should_run {
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
dry_run(
ProviderFactory::<N>::new(
output_db,

View File

@@ -18,7 +18,6 @@ pub(crate) async fn dump_hashing_account_stage<N: ProviderNodeTypes<DB = Databas
to: BlockNumber,
output_datadir: ChainPath<DataDirPath>,
should_run: bool,
runtime: reth_tasks::Runtime,
) -> Result<()> {
let (output_db, tip_block_number) = setup(from, to, &output_datadir.db(), db_tool)?;
@@ -34,6 +33,7 @@ pub(crate) async fn dump_hashing_account_stage<N: ProviderNodeTypes<DB = Databas
unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;
if should_run {
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
dry_run(
ProviderFactory::<N>::new(
output_db,

View File

@@ -17,13 +17,13 @@ pub(crate) async fn dump_hashing_storage_stage<N: ProviderNodeTypes<DB = Databas
to: u64,
output_datadir: ChainPath<DataDirPath>,
should_run: bool,
runtime: reth_tasks::Runtime,
) -> Result<()> {
let (output_db, tip_block_number) = setup(from, to, &output_datadir.db(), db_tool)?;
unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;
if should_run {
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
dry_run(
ProviderFactory::<N>::new(
output_db,

View File

@@ -24,7 +24,6 @@ use reth_stages::{
};
use tracing::info;
#[expect(clippy::too_many_arguments)]
pub(crate) async fn dump_merkle_stage<N>(
db_tool: &DbTool<N>,
from: BlockNumber,
@@ -33,7 +32,6 @@ pub(crate) async fn dump_merkle_stage<N>(
should_run: bool,
evm_config: impl ConfigureEvm<Primitives = N::Primitives>,
consensus: impl FullConsensus<N::Primitives> + 'static,
runtime: reth_tasks::Runtime,
) -> Result<()>
where
N: ProviderNodeTypes<DB = DatabaseEnv>,
@@ -59,6 +57,7 @@ where
unwind_and_copy(db_tool, (from, to), tip_block_number, &output_db, evm_config, consensus)?;
if should_run {
let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?;
dry_run(
ProviderFactory::<N>::new(
output_db,

View File

@@ -72,36 +72,30 @@ pub struct StageCommand {
}
macro_rules! handle_stage {
($stage_fn:ident, $tool:expr, $command:expr, $runtime:expr) => {{
($stage_fn:ident, $tool:expr, $command:expr) => {{
let StageCommand { output_datadir, from, to, dry_run, .. } = $command;
let output_datadir =
output_datadir.with_chain($tool.chain().chain(), DatadirArgs::default());
$stage_fn($tool, *from, *to, output_datadir, *dry_run, $runtime).await?
$stage_fn($tool, *from, *to, output_datadir, *dry_run).await?
}};
($stage_fn:ident, $tool:expr, $command:expr, $executor:expr, $consensus:expr, $runtime:expr) => {{
($stage_fn:ident, $tool:expr, $command:expr, $executor:expr, $consensus:expr) => {{
let StageCommand { output_datadir, from, to, dry_run, .. } = $command;
let output_datadir =
output_datadir.with_chain($tool.chain().chain(), DatadirArgs::default());
$stage_fn($tool, *from, *to, output_datadir, *dry_run, $executor, $consensus, $runtime)
.await?
$stage_fn($tool, *from, *to, output_datadir, *dry_run, $executor, $consensus).await?
}};
}
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C> {
/// Execute `dump-stage` command
pub async fn execute<N, Comp, F>(
self,
components: F,
runtime: reth_tasks::Runtime,
) -> eyre::Result<()>
pub async fn execute<N, Comp, F>(self, components: F) -> eyre::Result<()>
where
N: CliNodeTypes<ChainSpec = C::ChainSpec>,
Comp: CliNodeComponents<N>,
F: FnOnce(Arc<C::ChainSpec>) -> Comp,
{
let Environment { provider_factory, .. } =
self.env.init::<N>(AccessRights::RO, runtime.clone())?;
let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RO)?;
let tool = DbTool::new(provider_factory)?;
let components = components(tool.chain());
let evm_config = components.evm_config().clone();
@@ -109,23 +103,12 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
match &self.command {
Stages::Execution(cmd) => {
handle_stage!(
dump_execution_stage,
&tool,
cmd,
evm_config,
consensus,
runtime.clone()
)
}
Stages::StorageHashing(cmd) => {
handle_stage!(dump_hashing_storage_stage, &tool, cmd, runtime.clone())
}
Stages::AccountHashing(cmd) => {
handle_stage!(dump_hashing_account_stage, &tool, cmd, runtime.clone())
handle_stage!(dump_execution_stage, &tool, cmd, evm_config, consensus)
}
Stages::StorageHashing(cmd) => handle_stage!(dump_hashing_storage_stage, &tool, cmd),
Stages::AccountHashing(cmd) => handle_stage!(dump_hashing_account_stage, &tool, cmd),
Stages::Merkle(cmd) => {
handle_stage!(dump_merkle_stage, &tool, cmd, evm_config, consensus, runtime.clone())
handle_stage!(dump_merkle_stage, &tool, cmd, evm_config, consensus)
}
}

View File

@@ -49,12 +49,11 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
N: CliNodeTypes<ChainSpec = C::ChainSpec>,
Comp: CliNodeComponents<N>,
{
let executor = ctx.task_executor.clone();
match self.command {
Subcommands::Run(command) => command.execute::<N, _, _>(ctx, components).await,
Subcommands::Drop(command) => command.execute::<N>(executor).await,
Subcommands::Dump(command) => command.execute::<N, _, _>(components, executor).await,
Subcommands::Unwind(command) => command.execute::<N, _, _>(components, executor).await,
Subcommands::Drop(command) => command.execute::<N>().await,
Subcommands::Dump(command) => command.execute::<N, _, _>(components).await,
Subcommands::Unwind(command) => command.execute::<N, _, _>(components).await,
}
}
}

View File

@@ -119,9 +119,8 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
// Does not do anything on windows.
let _ = fdlimit::raise_fd_limit();
let runtime = ctx.task_executor.clone();
let Environment { provider_factory, config, data_dir } =
self.env.init::<N>(AccessRights::RW, ctx.task_executor.clone())?;
self.env.init::<N>(AccessRights::RW)?;
let mut provider_rw = provider_factory.database_provider_rw()?;
let components = components(provider_factory.chain_spec());
@@ -172,7 +171,6 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
provider_factory.chain_spec(),
p2p_secret_key,
default_peers_path,
runtime.clone(),
)
.build(provider_factory.clone())
.start_network()
@@ -228,7 +226,6 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
provider_factory.chain_spec(),
p2p_secret_key,
default_peers_path,
runtime.clone(),
)
.build(provider_factory.clone())
.start_network()

View File

@@ -46,14 +46,12 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>, F, Comp>(
self,
components: F,
runtime: reth_tasks::Runtime,
) -> eyre::Result<()>
where
Comp: CliNodeComponents<N>,
F: FnOnce(Arc<C::ChainSpec>) -> Comp,
{
let Environment { provider_factory, config, data_dir: _ } =
self.env.init::<N>(AccessRights::RW, runtime)?;
let Environment { provider_factory, config, .. } = self.env.init::<N>(AccessRights::RW)?;
let target = self.command.unwind_target(provider_factory.clone())?;

View File

@@ -47,11 +47,6 @@ impl CliRunner {
self
}
/// Returns a clone of the underlying [`Runtime`](reth_tasks::Runtime).
pub fn runtime(&self) -> reth_tasks::Runtime {
self.runtime.clone()
}
/// Executes an async block on the runtime and blocks until completion.
pub fn block_on<F, T>(&self, fut: F) -> T
where

View File

@@ -19,7 +19,6 @@ reth-consensus.workspace = true
reth-primitives-traits.workspace = true
alloy-consensus.workspace = true
alloy-eips.workspace = true
alloy-primitives.workspace = true
[dev-dependencies]
alloy-primitives = { workspace = true, features = ["rand"] }

View File

@@ -2,7 +2,6 @@
use alloy_consensus::{BlockHeader as _, EMPTY_OMMER_ROOT_HASH};
use alloy_eips::{eip4844::DATA_GAS_PER_BLOB, eip7840::BlobParams};
use alloy_primitives::B256;
use reth_chainspec::{EthChainSpec, EthereumHardfork, EthereumHardforks};
use reth_consensus::ConsensusError;
use reth_primitives_traits::{
@@ -142,27 +141,6 @@ pub fn validate_block_pre_execution<B, ChainSpec>(
block: &SealedBlock<B>,
chain_spec: &ChainSpec,
) -> Result<(), ConsensusError>
where
B: Block,
ChainSpec: EthChainSpec + EthereumHardforks,
{
validate_block_pre_execution_with_tx_root(block, chain_spec, None)
}
/// Validate a block without regard for state using an optional pre-computed transaction root.
///
/// - Compares the ommer hash in the block header to the block body
/// - Compares the transactions root in the block header to the block body
/// - Pre-execution transaction validation
///
/// If `transaction_root` is provided, it is used instead of recomputing the transaction trie
/// root from the block body. The caller must ensure this value was derived from
/// `block.body().calculate_tx_root()`.
pub fn validate_block_pre_execution_with_tx_root<B, ChainSpec>(
block: &SealedBlock<B>,
chain_spec: &ChainSpec,
transaction_root: Option<B256>,
) -> Result<(), ConsensusError>
where
B: Block,
ChainSpec: EthChainSpec + EthereumHardforks,
@@ -170,14 +148,8 @@ where
post_merge_hardfork_fields(block, chain_spec)?;
// Check transaction root
let expected_transaction_root = block.header().transactions_root();
let calculated_transaction_root =
transaction_root.unwrap_or_else(|| block.body().calculate_tx_root());
if calculated_transaction_root != expected_transaction_root {
return Err(ConsensusError::BodyTransactionRootDiff(
GotExpected { got: calculated_transaction_root, expected: expected_transaction_root }
.into(),
))
if let Err(error) = block.ensure_transaction_root_valid() {
return Err(ConsensusError::BodyTransactionRootDiff(error.into()))
}
Ok(())
@@ -454,7 +426,7 @@ pub fn validate_against_parent_4844<H: BlockHeader>(
mod tests {
use super::*;
use alloy_consensus::{BlockBody, Header, TxEip4844};
use alloy_eips::{eip4844::DATA_GAS_PER_BLOB, eip4895::Withdrawals};
use alloy_eips::eip4895::Withdrawals;
use alloy_primitives::{Address, Bytes, Signature, U256};
use rand::Rng;
use reth_chainspec::ChainSpecBuilder;
@@ -535,66 +507,4 @@ mod tests {
// Test with custom larger limit - should pass
assert!(validate_header_extra_data(&header_33, 64).is_ok());
}
#[test]
fn precomputed_tx_root_correct_passes() {
let chain_spec = ChainSpecBuilder::mainnet().cancun_activated().build();
let transaction = mock_blob_tx(1, 1);
let tx_root = proofs::calculate_transaction_root(std::slice::from_ref(&transaction));
let header = Header {
base_fee_per_gas: Some(1337),
withdrawals_root: Some(proofs::calculate_withdrawals_root(&[])),
transactions_root: tx_root,
blob_gas_used: Some(DATA_GAS_PER_BLOB),
excess_blob_gas: Some(0),
..Default::default()
};
let body = BlockBody {
transactions: vec![transaction],
ommers: vec![],
withdrawals: Some(Withdrawals::default()),
};
let block = SealedBlock::seal_slow(alloy_consensus::Block { header, body });
// Some(correct_root) should pass just like None
assert!(
validate_block_pre_execution_with_tx_root(&block, &chain_spec, Some(tx_root)).is_ok()
);
assert!(validate_block_pre_execution_with_tx_root(&block, &chain_spec, None).is_ok());
}
#[test]
fn precomputed_tx_root_wrong_fails() {
let chain_spec = ChainSpecBuilder::mainnet().cancun_activated().build();
let transaction = mock_blob_tx(1, 1);
let tx_root = proofs::calculate_transaction_root(std::slice::from_ref(&transaction));
let header = Header {
base_fee_per_gas: Some(1337),
withdrawals_root: Some(proofs::calculate_withdrawals_root(&[])),
transactions_root: tx_root,
blob_gas_used: Some(DATA_GAS_PER_BLOB),
excess_blob_gas: Some(0),
..Default::default()
};
let body = BlockBody {
transactions: vec![transaction],
ommers: vec![],
withdrawals: Some(Withdrawals::default()),
};
let block = SealedBlock::seal_slow(alloy_consensus::Block { header, body });
let wrong_root = B256::repeat_byte(0xff);
assert!(matches!(
validate_block_pre_execution_with_tx_root(&block, &chain_spec, Some(wrong_root))
.unwrap_err(),
ConsensusError::BodyTransactionRootDiff(diff)
if diff.0.got == wrong_root && diff.0.expected == tx_root
));
}
}

Some files were not shown because too many files have changed in this diff Show More