Compare commits

..

1 Commits

Author SHA1 Message Date
Arsenii Kulikov
36bc20ca92 perf: decrease sparse trie task updates batch size 2026-03-10 13:31:25 +01:00
167 changed files with 3646 additions and 8980 deletions

View File

@@ -1,5 +0,0 @@
---
reth-trie-sparse: patch
---
Fixed a bug in `merge_subtrie_updates` where source insertions did not cancel destination removals (and vice versa), causing inconsistent trie updates accumulated across multiple `root()` calls without intermediate `take_updates()`. Added a test covering the cross-cancellation behavior.

View File

@@ -1,5 +0,0 @@
---
reth-tasks: patch
---
Added panic handler to all rayon thread pools that logs panics via `tracing::error` instead of aborting the process.

View File

@@ -1,5 +0,0 @@
---
reth-trie-sparse: patch
---
Refactored arena trie internals by adding a `BranchChildIdx::sibling()` helper, deduplicating `Index`/`NodeArena` type aliases, and replacing `is_empty()` with a `drop_root()` method. Fixed a bug where `cursor.pop()` was called before checking if the leaf was the root node, which could cause incorrect dirty-state propagation.

View File

@@ -1,5 +0,0 @@
---
reth-payload-builder: minor
---
Added observability metrics for payload resolve latency and new payload job creation latency to the payload builder service.

View File

@@ -1,6 +0,0 @@
---
reth-trie: patch
reth-trie-sparse: patch
---
Refactored test harness for sparse trie tests by extracting `TrieTestHarness` into a shared `reth-trie` test utility, replacing duplicated inline harness code across multiple test modules. Updated `proof_v2` return type to include an optional root hash, and converted `original_root` and `storage` from public fields to accessor methods.

View File

@@ -1,7 +0,0 @@
---
reth-cli-commands: minor
reth-node-core: minor
reth: patch
---
Made v2 storage the default for all new databases, deprecating the `--storage.v2` flag to a hidden no-op kept for backwards compatibility. Updated CLI reference docs to remove the now-hidden flag from all command help pages.

View File

@@ -1,7 +0,0 @@
---
reth-engine-tree: patch
reth-trie-sparse: patch
reth-tasks: patch
---
Offloaded deallocation of expensive proof node buffers to a persistent background thread (`Runtime::spawn_drop`) to avoid blocking state root computation or lock-holding code.

View File

@@ -1,5 +0,0 @@
---
reth-cli-commands: minor
---
Added `reth_version` field to `SnapshotManifest` to record the Reth version that produced a snapshot. The field is optional and populated automatically during manifest generation.

View File

@@ -1,9 +0,0 @@
---
reth-trie-sparse: minor
reth-engine-primitives: minor
reth-engine-tree: minor
reth-node-core: minor
reth-trie-common: patch
---
Added an arena-based sparse trie implementation (`ArenaParallelSparseTrie`) using `slotmap` arena allocation for node storage, enabling parallel subtrie mutation without per-node hashing overhead. Added `ConfigurableSparseTrie` enum to switch between the arena and hash-map implementations, and a `--engine.enable-arena-sparse-trie` CLI flag to opt in at runtime.

View File

@@ -20,11 +20,6 @@
# include dist directory, where the reth binary is located after compilation
!/dist
# include PGO build helper used by Dockerfile.depot
!/.github
!/.github/scripts
!/.github/scripts/build_pgo_bolt.sh
# include licenses
!LICENSE-*

View File

@@ -5,4 +5,3 @@ self-hosted-runner:
- depot-ubuntu-latest-4
- depot-ubuntu-latest-8
- depot-ubuntu-latest-16
- available

View File

@@ -1,276 +0,0 @@
#!/usr/bin/env python3
"""
Prometheus metrics proxy that fetches from a local reth node and
re-exposes with additional benchmark labels.
Reads labels from a JSON file (updated by local-reth-bench.sh between runs)
and injects them into every Prometheus metric line.
Returns empty 200 when reth is not running (clean Grafana gaps).
"""
import argparse
import ipaddress
import json
import subprocess
import sys
import time
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.request import urlopen
from urllib.error import URLError
def read_labels(path):
try:
with open(path) as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {}
def inject_labels(metrics_bytes, label_str, label_names):
"""Inject labels into Prometheus text format.
Operates on bytes and uses simple string ops instead of regex
for speed on large payloads (reth exposes thousands of metrics).
Skips injecting into lines that already contain any of the label names
to avoid duplicate labels (which Prometheus rejects).
"""
if not label_str:
return metrics_bytes
label_bytes = label_str.encode("utf-8")
# Pre-encode label names for fast duplicate detection
label_name_bytes = [n.encode("utf-8") for n in label_names]
out = []
for line in metrics_bytes.split(b"\n"):
# Skip comments and blank lines
if line.startswith(b"#") or not line:
out.append(line)
continue
brace = line.find(b"{")
space = line.find(b" ")
if space == -1:
# Malformed, pass through
out.append(line)
elif brace != -1 and brace < space:
# Has labels: metric{existing="val"} 123
close = line.find(b"}", brace)
if close == -1:
out.append(line)
continue
# Filter out labels that already exist in this line
existing = line[brace + 1:close]
inject = label_bytes
if existing:
for name in label_name_bytes:
if name + b"=" in existing:
# Rebuild inject string excluding this label
inject = _remove_label(inject, name)
if not inject:
out.append(line)
continue
if close == brace + 1:
# Empty braces: metric{} 123
out.append(line[:close] + inject + line[close:])
else:
out.append(line[:close] + b"," + inject + line[close:])
else:
# No labels: metric 123
out.append(line[:space] + b"{" + label_bytes + b"}" + line[space:])
return b"\n".join(out)
def _remove_label(label_bytes, name):
"""Remove a single label (name=\"...\") from a comma-separated label string."""
parts = []
for part in label_bytes.split(b","):
if not part.startswith(name + b"="):
parts.append(part)
return b",".join(parts)
def build_label_str(labels):
"""Pre-format the label injection string: key1="val1",key2="val2" """
if not labels:
return ""
return ",".join(f'{k}="{v}"' for k, v in sorted(labels.items()))
def build_elapsed_gauge(labels):
"""Build a bench_elapsed_seconds gauge from run_start_epoch in labels."""
start = labels.get("run_start_epoch")
if not start:
return b""
try:
elapsed = time.time() - float(start)
except (ValueError, TypeError):
return b""
# Build labels excluding internal keys
display = {k: v for k, v in labels.items()
if k not in ("run_start_epoch", "reference_epoch")}
lstr = build_label_str(display)
return (
f"# HELP bench_elapsed_seconds Seconds since benchmark run started\n"
f"# TYPE bench_elapsed_seconds gauge\n"
f"bench_elapsed_seconds{{{lstr}}} {elapsed:.1f}\n"
).encode("utf-8")
def compute_timestamp_ms(labels):
"""Compute a synthetic timestamp so all runs share a common time origin.
Returns the timestamp in milliseconds, or None if not enough info.
Uses: reference_epoch + (now - run_start_epoch) → all runs overlay at
the same Grafana time range.
"""
ref = labels.get("reference_epoch")
start = labels.get("run_start_epoch")
if not ref or not start:
return None
try:
elapsed = time.time() - float(start)
return int((float(ref) + elapsed) * 1000)
except (ValueError, TypeError):
return None
def inject_timestamps(metrics_bytes, timestamp_ms):
"""Append a Prometheus timestamp (ms) to every data line.
Prometheus text format: metric{labels} value [timestamp_ms]
Adding timestamps causes Prometheus to store all runs' samples
at the same relative time, enabling natural overlay in Grafana.
"""
if timestamp_ms is None:
return metrics_bytes
ts = str(timestamp_ms).encode("utf-8")
out = []
for line in metrics_bytes.split(b"\n"):
if line.startswith(b"#") or not line:
out.append(line)
else:
out.append(line + b" " + ts)
return b"\n".join(out)
class MetricsHandler(BaseHTTPRequestHandler):
# Use HTTP/1.1 so Content-Length is respected and Prometheus
# doesn't have to rely on connection close to detect end of body.
protocol_version = "HTTP/1.1"
def do_GET(self):
src = self.client_address[0]
try:
resp = urlopen(self.server.upstream, timeout=2)
metrics = resp.read()
except (URLError, ConnectionError, OSError):
# reth not running — return empty 200
self._send(b"")
#print(f" scrape from {src}: empty (reth not running)", flush=True)
return
all_labels = read_labels(self.server.labels_file)
# Internal keys — not injected as Prometheus labels
internal = ("run_start_epoch", "reference_epoch")
labels = {k: v for k, v in all_labels.items() if k not in internal}
label_str = build_label_str(labels)
label_names = sorted(labels.keys())
t0 = time.monotonic()
result = inject_labels(metrics, label_str, label_names)
result += build_elapsed_gauge(all_labels)
ts_ms = compute_timestamp_ms(all_labels)
result = inject_timestamps(result, ts_ms)
dt = time.monotonic() - t0
self._send(result)
print(f" scrape from {src}: {len(metrics)} -> {len(result)} bytes, "
f"inject {dt*1000:.1f}ms", flush=True)
def _send(self, body):
self.send_response(200)
self.send_header("Content-Type", "text/plain; version=0.0.4")
self.send_header("Content-Length", str(len(body)))
self.send_header("Connection", "close")
self.end_headers()
if body:
self.wfile.write(body)
def log_message(self, format, *args):
pass # suppress per-request logging
def resolve_bind_address(subnet_cidr):
"""Find the local IP address that belongs to the given subnet.
Uses ``ip -j addr show`` to enumerate interfaces and returns the first
address that falls within *subnet_cidr* (e.g. ``10.10.0.0/24``).
"""
network = ipaddress.ip_network(subnet_cidr, strict=False)
try:
result = subprocess.run(
["ip", "-j", "addr", "show"],
capture_output=True, text=True, check=True,
)
interfaces = json.loads(result.stdout)
except (subprocess.CalledProcessError, FileNotFoundError, json.JSONDecodeError) as exc:
print(f"Error: cannot enumerate interfaces: {exc}", file=sys.stderr)
sys.exit(1)
for iface in interfaces:
for addr_info in iface.get("addr_info", []):
try:
addr = ipaddress.ip_address(addr_info["local"])
except (KeyError, ValueError):
continue
if addr in network:
return str(addr)
print(f"Error: no interface address found in subnet {subnet_cidr}", file=sys.stderr)
sys.exit(1)
def main():
parser = argparse.ArgumentParser(description="Prometheus metrics proxy with label injection")
parser.add_argument("--labels", default="/tmp/bench-metrics-labels.json",
help="Path to JSON file with labels to inject (default: /tmp/bench-metrics-labels.json)")
parser.add_argument("--upstream", default="http://127.0.0.1:9100/",
help="Upstream reth metrics URL (default: http://127.0.0.1:9100/)")
bind_group = parser.add_mutually_exclusive_group()
bind_group.add_argument("--bind", default=None,
help="Address to bind the proxy (default: 0.0.0.0)")
bind_group.add_argument("--subnet", default=None,
help="Auto-detect bind address from a local interface in this subnet (e.g. 10.10.0.0/24)")
parser.add_argument("--port", type=int, default=9090,
help="Port to bind the proxy (default: 9090)")
args = parser.parse_args()
if args.subnet:
bind_addr = resolve_bind_address(args.subnet)
elif args.bind:
bind_addr = args.bind
else:
bind_addr = "0.0.0.0"
server = HTTPServer((bind_addr, args.port), MetricsHandler)
server.upstream = args.upstream
server.labels_file = args.labels
print(f"bench-metrics-proxy listening on {bind_addr}:{args.port}")
print(f" upstream: {args.upstream}")
print(f" labels: {args.labels}")
sys.stdout.flush()
server.serve_forever()
if __name__ == "__main__":
main()

View File

@@ -22,22 +22,6 @@ MODE="$1"
SOURCE_DIR="$2"
COMMIT="$3"
# Tracy support: when BENCH_TRACY is "on" or "full", add Tracy cargo features
# and frame pointers for accurate stack traces.
EXTRA_FEATURES=""
EXTRA_RUSTFLAGS=""
if [ "${BENCH_TRACY:-off}" != "off" ]; then
EXTRA_FEATURES="tracy,tracy-client/ondemand"
EXTRA_RUSTFLAGS=" -C force-frame-pointers=yes"
fi
# Cache suffix: hash of features+rustflags so different build configs get separate cache entries
if [ -n "$EXTRA_FEATURES" ] || [ -n "$EXTRA_RUSTFLAGS" ]; then
BUILD_SUFFIX="-$(echo "${EXTRA_FEATURES}${EXTRA_RUSTFLAGS}" | sha256sum | cut -c1-12)"
else
BUILD_SUFFIX=""
fi
# Verify a cached reth binary was built from the expected commit.
# `reth --version` outputs "Commit SHA: <full-sha>" on its own line.
verify_binary() {
@@ -58,7 +42,7 @@ verify_binary() {
case "$MODE" in
baseline|main)
BUCKET="minio/reth-binaries/${COMMIT}${BUILD_SUFFIX}"
BUCKET="minio/reth-binaries/${COMMIT}"
mkdir -p "${SOURCE_DIR}/target/profiling"
CACHE_VALID=false
@@ -75,23 +59,14 @@ case "$MODE" in
if [ "$CACHE_VALID" = false ]; then
echo "Building baseline (${COMMIT}) from source..."
cd "${SOURCE_DIR}"
FEATURES_ARG=""
WORKSPACE_ARG=""
if [ -n "$EXTRA_FEATURES" ]; then
# --workspace is needed for cross-package feature syntax (tracy-client/ondemand)
FEATURES_ARG="--features ${EXTRA_FEATURES}"
WORKSPACE_ARG="--workspace"
fi
# shellcheck disable=SC2086
RUSTFLAGS="-C target-cpu=native${EXTRA_RUSTFLAGS}" \
cargo build --profile profiling --bin reth $WORKSPACE_ARG $FEATURES_ARG
cargo build --profile profiling --bin reth
$MC cp target/profiling/reth "${BUCKET}/reth"
fi
;;
feature|branch)
BRANCH_SHA="${4:-$COMMIT}"
BUCKET="minio/reth-binaries/${BRANCH_SHA}${BUILD_SUFFIX}"
BUCKET="minio/reth-binaries/${BRANCH_SHA}"
CACHE_VALID=false
if $MC stat "${BUCKET}/reth" &>/dev/null && $MC stat "${BUCKET}/reth-bench" &>/dev/null; then
@@ -110,14 +85,7 @@ case "$MODE" in
echo "Building feature (${COMMIT}) from source..."
cd "${SOURCE_DIR}"
rustup show active-toolchain || rustup default stable
if [ -n "$EXTRA_FEATURES" ]; then
# Can't use `make profiling` when adding features; build explicitly
# --workspace is needed for cross-package feature syntax (tracy-client/ondemand)
RUSTFLAGS="-C target-cpu=native${EXTRA_RUSTFLAGS}" \
cargo build --profile profiling --workspace --bin reth --features "${EXTRA_FEATURES}"
else
make profiling
fi
make profiling
make install-reth-bench
$MC cp target/profiling/reth "${BUCKET}/reth"
$MC cp "$(which reth-bench)" "${BUCKET}/reth-bench"

View File

@@ -1,581 +0,0 @@
#!/usr/bin/env bash
#
# local-reth-bench.sh — Run the reth Engine API benchmark locally.
#
# Replicates the CI bench.yml workflow (build, snapshot, system tuning,
# interleaved B-F-F-B execution, summary, charts) without any GitHub
# Actions glue (no PR comments, no artifact upload, no Slack).
#
# Usage:
# local-reth-bench.sh <baseline-ref> <feature-ref> [options]
#
# Options:
# --blocks N Number of blocks to benchmark (default: 500)
# --warmup N Number of warmup blocks (default: 100)
# --cores N Limit reth to N CPU cores, 0 = all available (default: 0)
# --samply Enable samply profiling
# --tracy MODE Tracy profiling: off, on, full (default: off)
# --tracy-filter F Tracy tracing filter (default: debug)
# --no-tune Skip system tuning (useful on dev machines / macOS)
#
# Requires: the reth repo at RETH_REPO (default: ~/reth)
#
# Dependencies (install before first run):
# mc (MinIO client), schelk, cpupower, taskset, stdbuf, python3, curl,
# make, uv, pzstd, jq, Rust toolchain (cargo/rustup)
#
# The script delegates to the existing bench-reth-*.sh scripts in the reth
# repo for the actual build, snapshot, and run steps.
set -euo pipefail
# ── PATH ──────────────────────────────────────────────────────────────
# Ensure cargo and user-local bins (mc, uv) are visible
export PATH="$HOME/.local/bin:$HOME/.cargo/bin:$PATH"
# ── Defaults ──────────────────────────────────────────────────────────
RETH_REPO="${RETH_REPO:-$HOME/reth}"
BLOCKS=500
WARMUP=100
CORES=0
SAMPLY=false
TRACY="off"
TRACY_FILTER="debug"
TUNE=true
BASELINE_REF=""
FEATURE_REF=""
# ── Parse arguments ──────────────────────────────────────────────────
usage() {
cat <<EOF
Usage: $(basename "$0") <baseline-ref> <feature-ref> [options]
Options:
--blocks N Number of blocks to benchmark (default: 500)
--warmup N Number of warmup blocks (default: 100)
--cores N Limit reth to N CPU cores (default: 0 = all)
--samply Enable samply profiling
--tracy MODE Tracy profiling: off, on, full (default: off)
on = tracing only (lower overhead)
full = tracing + CPU sampling (higher overhead)
--tracy-filter F Tracy tracing filter (default: debug)
--no-tune Skip system tuning
EOF
exit 1
}
while [[ $# -gt 0 ]]; do
case "$1" in
--blocks) BLOCKS="$2"; shift 2 ;;
--warmup) WARMUP="$2"; shift 2 ;;
--cores) CORES="$2"; shift 2 ;;
--samply) SAMPLY=true; shift ;;
--tracy) TRACY="$2"; shift 2 ;;
--tracy-filter) TRACY_FILTER="$2"; shift 2 ;;
--no-tune) TUNE=false; shift ;;
--help|-h) usage ;;
-*) echo "Unknown option: $1"; usage ;;
*)
if [ -z "$BASELINE_REF" ]; then
BASELINE_REF="$1"
elif [ -z "$FEATURE_REF" ]; then
FEATURE_REF="$1"
else
echo "Unexpected argument: $1"; usage
fi
shift
;;
esac
done
if [ -z "$BASELINE_REF" ] || [ -z "$FEATURE_REF" ]; then
echo "Error: both <baseline-ref> and <feature-ref> are required."
usage
fi
# Validate --tracy value
case "$TRACY" in
off|on|full) ;;
*) echo "Error: --tracy must be off, on, or full (got: $TRACY)"; usage ;;
esac
# Samply + tracy=full are mutually exclusive (both use perf sampling)
if [ "$SAMPLY" = "true" ] && [ "$TRACY" = "full" ]; then
echo "Warning: samply and tracy=full both use perf sampling; downgrading tracy to 'on'."
TRACY="on"
fi
# ── Check dependencies ───────────────────────────────────────────────
missing=()
for cmd in mc schelk cpupower taskset stdbuf python3 curl make uv pzstd jq cargo; do
command -v "$cmd" &>/dev/null || missing+=("$cmd")
done
if [ ${#missing[@]} -gt 0 ]; then
echo "Error: missing required tools: ${missing[*]}"
echo "See the CI 'Install dependencies' step in .github/workflows/bench.yml for install instructions."
exit 1
fi
if [ "$TRACY" != "off" ]; then
if ! command -v tracy-capture &>/dev/null; then
echo "Error: tracy-capture is required for --tracy $TRACY"
exit 1
fi
fi
# Ensure tools that run via sudo are in a sudo-visible path.
# The bench scripts use `sudo schelk` / `sudo samply` but cargo installs
# them to ~/.cargo/bin which sudo's secure_path doesn't include.
for cmd in schelk samply; do
if command -v "$cmd" &>/dev/null && ! sudo sh -c "command -v $cmd" &>/dev/null; then
echo "Installing $cmd to /usr/local/bin (needed for sudo)..."
sudo install "$(command -v "$cmd")" /usr/local/bin/
fi
done
if [ ! -d "$RETH_REPO/.git" ]; then
echo "Error: RETH_REPO=$RETH_REPO is not a git repository."
echo "Set RETH_REPO or clone reth to ~/reth"
exit 1
fi
# ── Resolve paths ────────────────────────────────────────────────────
SELF_DIR="$(cd "$(dirname "$0")" && pwd)"
SCRIPTS_DIR="${RETH_REPO}/.github/scripts"
BENCH_WORK_DIR="${RETH_REPO}/../bench-work-$(date +%Y%m%d-%H%M%S)"
BASELINE_SRC="${RETH_REPO}/../reth-baseline"
FEATURE_SRC="${RETH_REPO}/../reth-feature"
mkdir -p "$BENCH_WORK_DIR"
BENCH_WORK_DIR="$(cd "$BENCH_WORK_DIR" && pwd)"
# ── Global cleanup trap (restores system tuning on any exit) ─────────
TUNING_APPLIED=false
CSTATE_PID=
METRICS_PROXY_PID=
cleanup_global() {
[ -n "$METRICS_PROXY_PID" ] && kill "$METRICS_PROXY_PID" 2>/dev/null || true
if [ "$TUNING_APPLIED" = true ]; then
echo
echo "▸ Restoring system settings..."
[ -n "$CSTATE_PID" ] && kill "$CSTATE_PID" 2>/dev/null || true
sudo systemctl start irqbalance cron atd 2>/dev/null || true
echo " System settings restored."
fi
}
trap cleanup_global EXIT
echo "═══════════════════════════════════════════════════════════"
echo " reth local benchmark"
echo "═══════════════════════════════════════════════════════════"
echo " Baseline ref : $BASELINE_REF"
echo " Feature ref : $FEATURE_REF"
echo " Blocks : $BLOCKS"
echo " Warmup : $WARMUP"
echo " Cores : $CORES"
echo " Samply : $SAMPLY"
echo " Tracy : $TRACY"
echo " Tracy filter : $TRACY_FILTER"
echo " System tune : $TUNE"
echo " Work dir : $BENCH_WORK_DIR"
echo " Reth repo : $RETH_REPO"
echo "═══════════════════════════════════════════════════════════"
echo
# Enable sccache if available (matches CI's RUSTC_WRAPPER=sccache)
if command -v sccache &>/dev/null; then
export RUSTC_WRAPPER="sccache"
fi
# Export env vars expected by the bench-reth-*.sh scripts
export BENCH_BLOCKS="$BLOCKS"
export BENCH_WARMUP_BLOCKS="$WARMUP"
export BENCH_CORES="$CORES"
export BENCH_SAMPLY="$SAMPLY"
export BENCH_TRACY="$TRACY"
export BENCH_TRACY_FILTER="$TRACY_FILTER"
export BENCH_WORK_DIR
export SCHELK_MOUNT="${SCHELK_MOUNT:-/reth-bench}"
export BENCH_RPC_URL="${BENCH_RPC_URL:-https://ethereum.reth.rs/rpc}"
export BENCH_METRICS_ADDR="127.0.0.1:9100"
# ── Step 1: Resolve refs to full SHAs ────────────────────────────────
echo "▸ Resolving git refs..."
cd "$RETH_REPO"
resolve_ref() {
local ref="$1"
git fetch origin "$ref" --quiet 2>/dev/null || true
git rev-parse "$ref" 2>/dev/null \
|| git rev-parse "origin/$ref" 2>/dev/null \
|| { echo "Error: cannot resolve ref '$ref'"; exit 1; }
}
BASELINE_SHA="$(resolve_ref "$BASELINE_REF")"
FEATURE_SHA="$(resolve_ref "$FEATURE_REF")"
echo " Baseline SHA : $BASELINE_SHA"
echo " Feature SHA : $FEATURE_SHA"
echo
# ── Step 2: Prepare source directories ───────────────────────────────
echo "▸ Preparing source directories..."
prepare_source() {
local src_dir="$1" ref="$2"
if [ -d "$src_dir" ]; then
git -C "$src_dir" fetch origin "$ref" 2>/dev/null || true
else
git clone --recurse-submodules "$RETH_REPO" "$src_dir"
fi
git -C "$src_dir" checkout "$ref" --force
git -C "$src_dir" submodule update --init --recursive
}
prepare_source "$BASELINE_SRC" "$BASELINE_SHA"
prepare_source "$FEATURE_SRC" "$FEATURE_SHA"
BASELINE_SRC="$(cd "$BASELINE_SRC" && pwd)"
FEATURE_SRC="$(cd "$FEATURE_SRC" && pwd)"
echo " Baseline src : $BASELINE_SRC"
echo " Feature src : $FEATURE_SRC"
echo
# ── Step 3: Check / download snapshot ────────────────────────────────
echo "▸ Checking snapshot..."
cd "$RETH_REPO"
SNAPSHOT_NEEDED=false
if ! "${SCRIPTS_DIR}/bench-reth-snapshot.sh" --check; then
SNAPSHOT_NEEDED=true
echo " Snapshot needs update."
else
echo " Snapshot is up-to-date."
fi
echo
# ── Step 4: Build binaries (+ snapshot download) in parallel ─────────
echo "▸ Building binaries (parallel)..."
cd "$RETH_REPO"
FAIL=0
"${SCRIPTS_DIR}/bench-reth-build.sh" baseline "$BASELINE_SRC" "$BASELINE_SHA" &
PID_BASELINE=$!
"${SCRIPTS_DIR}/bench-reth-build.sh" feature "$FEATURE_SRC" "$FEATURE_SHA" &
PID_FEATURE=$!
PID_SNAPSHOT=
if [ "$SNAPSHOT_NEEDED" = "true" ]; then
echo " Also downloading snapshot in parallel..."
"${SCRIPTS_DIR}/bench-reth-snapshot.sh" &
PID_SNAPSHOT=$!
fi
wait $PID_BASELINE || FAIL=1
wait $PID_FEATURE || FAIL=1
[ -n "$PID_SNAPSHOT" ] && { wait $PID_SNAPSHOT || FAIL=1; }
if [ $FAIL -ne 0 ]; then
echo "Error: one or more parallel tasks failed (builds / snapshot)"
exit 1
fi
echo " Binaries built successfully."
echo
# ── Step 5: System tuning (optional) ────────────────────────────────
if [ "$TUNE" = "true" ]; then
echo "▸ Applying system tuning..."
sudo cpupower frequency-set -g performance 2>/dev/null || true
# Disable turbo boost (Intel + AMD)
echo 1 | sudo tee /sys/devices/system/cpu/intel_pstate/no_turbo 2>/dev/null || true
echo 0 | sudo tee /sys/devices/system/cpu/cpufreq/boost 2>/dev/null || true
sudo swapoff -a 2>/dev/null || true
echo 0 | sudo tee /proc/sys/kernel/randomize_va_space 2>/dev/null || true
# Disable SMT (hyperthreading)
for cpu in /sys/devices/system/cpu/cpu*/topology/thread_siblings_list; do
[ -f "$cpu" ] || continue
first=$(cut -d, -f1 < "$cpu" | cut -d- -f1)
current=$(echo "$cpu" | grep -o 'cpu[0-9]*' | grep -o '[0-9]*')
if [ "$current" != "$first" ]; then
echo 0 | sudo tee "/sys/devices/system/cpu/cpu${current}/online" 2>/dev/null || true
fi
done
echo " Online CPUs: $(nproc)"
# Disable transparent huge pages
for p in /sys/kernel/mm/transparent_hugepage /sys/kernel/mm/transparent_hugepages; do
if [ -d "$p" ]; then
echo never | sudo tee "$p/enabled" 2>/dev/null || true
echo never | sudo tee "$p/defrag" 2>/dev/null || true
break
fi
done
# Prevent deep C-states
sudo sh -c 'exec 3<>/dev/cpu_dma_latency; echo -ne "\x00\x00\x00\x00" >&3; sleep infinity' &
CSTATE_PID=$!
# Pin IRQs to core 0
for irq in /proc/irq/*/smp_affinity_list; do
echo 0 | sudo tee "$irq" 2>/dev/null || true
done
# Stop noisy background services
sudo systemctl stop irqbalance cron atd unattended-upgrades snapd 2>/dev/null || true
TUNING_APPLIED=true
# Log environment for reproducibility (matches CI)
echo " === Benchmark environment ==="
echo " Kernel : $(uname -r)"
lscpu | grep -E 'Model name|CPU\(s\)|MHz|NUMA' | sed 's/^/ /'
echo " Governor : $(cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_governor 2>/dev/null || echo unknown)"
echo " Freq : $(cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_cur_freq 2>/dev/null || echo unknown)"
echo " THP : $(cat /sys/kernel/mm/transparent_hugepage/enabled 2>/dev/null || cat /sys/kernel/mm/transparent_hugepages/enabled 2>/dev/null || echo unknown)"
free -h | sed 's/^/ /'
echo " System tuning applied."
echo
fi
# ── Step 5b: Tracefs mount (tracy=full only) ─────────────────────────
if [ "$TRACY" = "full" ] && [ "$(uname)" = "Linux" ]; then
echo "▸ Mounting tracefs for Tracy full mode..."
sudo mount -t tracefs tracefs /sys/kernel/tracing -o mode=755 2>/dev/null || true
fi
# ── Tracy upload & viewer helpers ────────────────────────────────────
TRACY_VIEWER_BASE="${TRACY_VIEWER_BASE:-}"
tracy_viewer_url() {
local profile_url="$1"
if [ -z "$TRACY_VIEWER_BASE" ]; then
echo ""
return
fi
local encoded
encoded=$(python3 -c "import urllib.parse, sys; print(urllib.parse.quote(sys.argv[1], safe=''))" "$profile_url")
echo "${TRACY_VIEWER_BASE}?profile_url=${encoded}"
}
upload_tracy() {
local label="$1" output_dir="$2" sha="$3"
local tracy_file="$output_dir/tracy-profile.tracy"
if [ ! -f "$tracy_file" ]; then
echo " Tracy: no profile found, skipping upload."
return
fi
local timestamp short_sha remote_name bucket mc_alias
timestamp=$(date +%Y%m%d-%H%M%S)
short_sha="${sha:0:7}"
remote_name="${label}-${short_sha}-${timestamp}.tracy"
bucket="${TRACY_BUCKET:-tracy-profiles}"
mc_alias="${MC_ALIAS:-minio}"
local minio_base="${TRACY_MINIO_URL:-http://minio.minio.svc.cluster.local:9000}"
echo " Tracy: uploading profile..."
if mc cp "$tracy_file" "${mc_alias}/${bucket}/${remote_name}"; then
local url="${minio_base}/${bucket}/${remote_name}"
echo "$url" > "$output_dir/tracy_url.txt"
local viewer
viewer=$(tracy_viewer_url "$url")
if [ -n "$viewer" ]; then
echo "$viewer" > "$output_dir/tracy_viewer_url.txt"
echo " Tracy: uploaded → $viewer"
else
echo " Tracy: uploaded → $url"
fi
else
echo " Tracy: upload failed (non-fatal)."
fi
# Delete large profile to free disk
rm -f "$tracy_file"
}
# ── Step 6: Pre-flight cleanup ───────────────────────────────────────
echo "▸ Pre-flight cleanup..."
pkill -f bench-metrics-proxy 2>/dev/null || true
sudo pkill -9 reth 2>/dev/null || true
sleep 1
if mountpoint -q "$SCHELK_MOUNT" 2>/dev/null; then
sudo umount -l "$SCHELK_MOUNT" 2>/dev/null || true
sudo schelk recover -y 2>/dev/null || true
fi
echo
# ── Step 7: Interleaved benchmark runs (B-F-F-B) ────────────────────
# This ordering reduces systematic bias from thermal drift and cache warming.
BASELINE_BIN="${BASELINE_SRC}/target/profiling/reth"
FEATURE_BIN="${FEATURE_SRC}/target/profiling/reth"
# Start metrics proxy (reth → label injection → Prometheus)
LABELS_FILE="/tmp/bench-metrics-labels.json"
echo '{}' > "$LABELS_FILE"
METRICS_SUBNET="${METRICS_SUBNET:-10.10.0.0/24}"
METRICS_PORT="${METRICS_PORT:-9090}"
python3 "${SELF_DIR}/bench-metrics-proxy.py" \
--labels "$LABELS_FILE" \
--upstream "http://${BENCH_METRICS_ADDR}/" \
--subnet "$METRICS_SUBNET" \
--port "$METRICS_PORT" &
METRICS_PROXY_PID=$!
echo "▸ Metrics proxy started (PID $METRICS_PROXY_PID) on subnet ${METRICS_SUBNET}, port ${METRICS_PORT}"
# Unique benchmark ID: local-<timestamp> for local runs, ci-<run_id> for CI
BENCH_ID="local-$(basename "$BENCH_WORK_DIR" | sed 's/bench-work-//')"
# Reference epoch: shared time origin so all runs overlay in Grafana.
# The proxy maps each run's elapsed time onto this common origin.
BENCH_REFERENCE_EPOCH=$(date +%s)
write_labels() {
local run_label="$1" run_type="$2" ref="$3" sha="$4"
LAST_RUN_START=$(date +%s)
cat > "$LABELS_FILE" <<-EOF
{"benchmark_run":"${run_label}","run_type":"${run_type}","git_ref":"${ref}","bench_sha":"${sha}","benchmark_id":"${BENCH_ID}","run_start_epoch":"${LAST_RUN_START}","reference_epoch":"${BENCH_REFERENCE_EPOCH}"}
EOF
}
run_bench() {
local label="$1" binary="$2" output_dir="$3"
echo "▸ Running benchmark: ${label}..."
cd "$RETH_REPO"
if command -v taskset &>/dev/null; then
taskset -c 0 "${SCRIPTS_DIR}/bench-reth-run.sh" "$label" "$binary" "$output_dir"
else
"${SCRIPTS_DIR}/bench-reth-run.sh" "$label" "$binary" "$output_dir"
fi
echo "${label} complete."
echo
}
write_labels "baseline-1" "baseline" "$BASELINE_REF" "$BASELINE_SHA"
run_bench "baseline-1" "$BASELINE_BIN" "$BENCH_WORK_DIR/baseline-1"
write_labels "feature-1" "feature" "$FEATURE_REF" "$FEATURE_SHA"
run_bench "feature-1" "$FEATURE_BIN" "$BENCH_WORK_DIR/feature-1"
write_labels "feature-2" "feature" "$FEATURE_REF" "$FEATURE_SHA"
run_bench "feature-2" "$FEATURE_BIN" "$BENCH_WORK_DIR/feature-2"
write_labels "baseline-2" "baseline" "$BASELINE_REF" "$BASELINE_SHA"
run_bench "baseline-2" "$BASELINE_BIN" "$BENCH_WORK_DIR/baseline-2"
# ── Compute Grafana URL ──────────────────────────────────────────────
GRAFANA_BASE_URL="https://tempoxyz.grafana.net/d/reth-bench-ghr/reth-bench-ghr"
GRAFANA_DATASOURCE="ef57fux92e9z4e"
LAST_RUN_DURATION=$(( $(date +%s) - LAST_RUN_START ))
FROM_MS=$(( BENCH_REFERENCE_EPOCH * 1000 ))
TO_MS=$(( (BENCH_REFERENCE_EPOCH + LAST_RUN_DURATION) * 1000 ))
GRAFANA_URL="${GRAFANA_BASE_URL}?orgId=1&from=${FROM_MS}&to=${TO_MS}&timezone=browser&var-datasource=${GRAFANA_DATASOURCE}&var-job=reth-bench&var-benchmark_id=${BENCH_ID}&var-benchmark_run=\$__all"
# ── Step 8: Scan logs for errors ─────────────────────────────────────
echo "▸ Scanning logs for errors..."
ERRORS_FILE="$BENCH_WORK_DIR/errors.md"
found_errors=false
for run_dir in baseline-1 feature-1 feature-2 baseline-2; do
LOG="$BENCH_WORK_DIR/$run_dir/node.log"
[ -f "$LOG" ] || continue
panics=$(grep -c -E 'panicked at' "$LOG" 2>/dev/null || true)
errors=$(grep -c ' ERROR ' "$LOG" 2>/dev/null || true)
if [ "$panics" -gt 0 ] || [ "$errors" -gt 0 ]; then
if [ "$found_errors" = false ]; then
printf '### ⚠️ Node Errors\n\n' >> "$ERRORS_FILE"
found_errors=true
fi
printf '<details><summary><b>%s</b>: %d panic(s), %d error(s)</summary>\n\n' \
"$run_dir" "$panics" "$errors" >> "$ERRORS_FILE"
if [ "$panics" -gt 0 ]; then
printf '**Panics:**\n```\n' >> "$ERRORS_FILE"
grep -E 'panicked at' "$LOG" | head -10 >> "$ERRORS_FILE"
printf '```\n' >> "$ERRORS_FILE"
fi
if [ "$errors" -gt 0 ]; then
printf '**Errors (first 20):**\n```\n' >> "$ERRORS_FILE"
grep ' ERROR ' "$LOG" | head -20 >> "$ERRORS_FILE"
printf '```\n' >> "$ERRORS_FILE"
fi
printf '\n</details>\n\n' >> "$ERRORS_FILE"
fi
done
if [ "$found_errors" = true ]; then
echo " ⚠ Errors found — see $ERRORS_FILE"
else
echo " No errors found."
fi
echo
# ── Step 9: Parse results ───────────────────────────────────────────
echo "▸ Parsing results..."
cd "$RETH_REPO"
SUMMARY_ARGS=(
--output-summary "$BENCH_WORK_DIR/summary.json"
--output-markdown "$BENCH_WORK_DIR/comment.md"
--repo "paradigmxyz/reth"
--baseline-ref "$BASELINE_SHA"
--baseline-name "$BASELINE_REF"
--feature-name "$FEATURE_REF"
--feature-ref "$FEATURE_SHA"
--baseline-csv "$BENCH_WORK_DIR/baseline-1/combined_latency.csv" "$BENCH_WORK_DIR/baseline-2/combined_latency.csv"
--feature-csv "$BENCH_WORK_DIR/feature-1/combined_latency.csv" "$BENCH_WORK_DIR/feature-2/combined_latency.csv"
--gas-csv "$BENCH_WORK_DIR/feature-1/total_gas.csv"
--grafana-url "$GRAFANA_URL"
)
python3 "${SCRIPTS_DIR}/bench-reth-summary.py" "${SUMMARY_ARGS[@]}"
echo
# ── Step 10: Generate charts ─────────────────────────────────────────
echo "▸ Generating charts..."
CHART_ARGS=(
--output-dir "$BENCH_WORK_DIR/charts"
--feature "$BENCH_WORK_DIR/feature-1/combined_latency.csv" "$BENCH_WORK_DIR/feature-2/combined_latency.csv"
--baseline "$BENCH_WORK_DIR/baseline-1/combined_latency.csv" "$BENCH_WORK_DIR/baseline-2/combined_latency.csv"
--baseline-name "$BASELINE_REF"
--feature-name "$FEATURE_REF"
)
if python3 -c "import matplotlib" 2>/dev/null; then
python3 "${SCRIPTS_DIR}/bench-reth-charts.py" "${CHART_ARGS[@]}"
elif command -v uv &>/dev/null; then
uv run --with matplotlib python3 "${SCRIPTS_DIR}/bench-reth-charts.py" "${CHART_ARGS[@]}"
else
echo " Warning: matplotlib not available, skipping chart generation."
fi
echo
# ── Step 11: Upload Tracy profiles ────────────────────────────────────
if [ "$TRACY" != "off" ]; then
echo "▸ Uploading Tracy profiles..."
upload_tracy "baseline-1" "$BENCH_WORK_DIR/baseline-1" "$BASELINE_SHA"
upload_tracy "feature-1" "$BENCH_WORK_DIR/feature-1" "$FEATURE_SHA"
upload_tracy "feature-2" "$BENCH_WORK_DIR/feature-2" "$FEATURE_SHA"
upload_tracy "baseline-2" "$BENCH_WORK_DIR/baseline-2" "$BASELINE_SHA"
echo
fi
# ── Done (system restore happens via EXIT trap) ─────────────────────
echo "═══════════════════════════════════════════════════════════"
echo " Benchmark complete!"
echo "═══════════════════════════════════════════════════════════"
echo " Results : $BENCH_WORK_DIR/summary.json"
echo " Markdown : $BENCH_WORK_DIR/comment.md"
echo " Charts : $BENCH_WORK_DIR/charts/"
if [ -f "$ERRORS_FILE" ]; then
echo " Errors : $ERRORS_FILE"
fi
echo " Grafana : $GRAFANA_URL"
if [ "$TRACY" != "off" ]; then
echo " ─── Tracy Profiles ───"
for run_dir in baseline-1 feature-1 feature-2 baseline-2; do
url_file="$BENCH_WORK_DIR/$run_dir/tracy_viewer_url.txt"
if [ -f "$url_file" ]; then
echo " $run_dir : $(cat "$url_file")"
fi
done
fi
echo "═══════════════════════════════════════════════════════════"

View File

@@ -11,8 +11,6 @@
# BENCH_WAIT_TIME (duration like 500ms, default empty)
# BENCH_BASELINE_ARGS (extra reth node args for baseline runs)
# BENCH_FEATURE_ARGS (extra reth node args for feature runs)
# BENCH_OTLP_TRACES_ENDPOINT (OTLP HTTP endpoint for traces, e.g. https://host/insert/opentelemetry/v1/traces)
# BENCH_OTLP_LOGS_ENDPOINT (OTLP HTTP endpoint for logs, e.g. https://host/insert/opentelemetry/v1/logs)
set -euo pipefail
LABEL="$1"
@@ -24,24 +22,6 @@ LOG="${OUTPUT_DIR}/node.log"
cleanup() {
kill "$TAIL_PID" 2>/dev/null || true
# Stop tracy-capture first (SIGINT makes it disconnect and flush to disk)
# Must happen before killing reth, otherwise reth keeps streaming data.
if [ -n "${TRACY_PID:-}" ] && kill -0 "$TRACY_PID" 2>/dev/null; then
echo "Stopping tracy-capture..."
kill -INT "$TRACY_PID" 2>/dev/null || true
for i in $(seq 1 30); do
kill -0 "$TRACY_PID" 2>/dev/null || break
if [ $((i % 10)) -eq 0 ]; then
echo "Waiting for tracy-capture to finish writing... (${i}s)"
fi
sleep 1
done
if kill -0 "$TRACY_PID" 2>/dev/null; then
echo "tracy-capture still running after 30s, killing..."
kill -9 "$TRACY_PID" 2>/dev/null || true
fi
wait "$TRACY_PID" 2>/dev/null || true
fi
if [ -n "${RETH_PID:-}" ] && sudo kill -0 "$RETH_PID" 2>/dev/null; then
if [ "${BENCH_SAMPLY:-false}" = "true" ]; then
# Send SIGINT to the inner reth process by exact name (not -f which
@@ -78,7 +58,6 @@ cleanup() {
fi
}
TAIL_PID=
TRACY_PID=
trap cleanup EXIT
# Clean up stale schelk state from a previous cancelled run.
@@ -137,51 +116,16 @@ if [ -n "$EXTRA_NODE_ARGS" ]; then
RETH_ARGS+=($EXTRA_NODE_ARGS)
fi
if [ -n "${BENCH_METRICS_ADDR:-}" ]; then
RETH_ARGS+=(--metrics "$BENCH_METRICS_ADDR")
fi
# OTLP traces and logs export
if [ -n "${BENCH_OTLP_TRACES_ENDPOINT:-}" ]; then
RETH_ARGS+=(--tracing-otlp="${BENCH_OTLP_TRACES_ENDPOINT}" --tracing-otlp.service-name=reth-bench)
fi
if [ -n "${BENCH_OTLP_LOGS_ENDPOINT:-}" ]; then
RETH_ARGS+=(--logs-otlp="${BENCH_OTLP_LOGS_ENDPOINT}" --logs-otlp.filter=debug)
fi
# Tracy profiling: add --log.tracy flags and set environment
if [ "${BENCH_TRACY:-off}" != "off" ]; then
RETH_ARGS+=(--log.tracy --log.tracy.filter "${BENCH_TRACY_FILTER:-debug}")
if [ "${BENCH_TRACY}" = "on" ]; then
export TRACY_NO_SYS_TRACE=1
elif [ "${BENCH_TRACY}" = "full" ]; then
export TRACY_SAMPLING_HZ="${BENCH_TRACY_SAMPLING_HZ:-1}"
fi
fi
SUDO_ENV=()
if [ -n "${OTEL_RESOURCE_ATTRIBUTES:-}" ]; then
SUDO_ENV+=("OTEL_RESOURCE_ATTRIBUTES=${OTEL_RESOURCE_ATTRIBUTES}")
SUDO_ENV+=("OTEL_BSP_MAX_QUEUE_SIZE=65536" "OTEL_BLRP_MAX_QUEUE_SIZE=65536")
fi
# Limit reth memory to 95% of available RAM to prevent OOM kills
TOTAL_MEM_KB=$(awk '/^MemTotal:/ {print $2}' /proc/meminfo)
MEM_LIMIT=$(( TOTAL_MEM_KB * 95 / 100 * 1024 ))
echo "Memory limit: $(( MEM_LIMIT / 1024 / 1024 ))MB (95% of $(( TOTAL_MEM_KB / 1024 ))MB)"
if [ "${BENCH_SAMPLY:-false}" = "true" ]; then
RETH_ARGS+=(--log.samply)
SAMPLY="$(which samply)"
sudo systemd-run --scope -p MemoryMax="$MEM_LIMIT" -p AllowedCPUs="$RETH_CPUS" \
env "${SUDO_ENV[@]}" nice -n -20 \
sudo taskset -c "$RETH_CPUS" nice -n -20 \
"$SAMPLY" record --save-only --presymbolicate --rate 10000 \
--output "$OUTPUT_DIR/samply-profile.json.gz" \
-- "$BINARY" "${RETH_ARGS[@]}" \
> "$LOG" 2>&1 &
else
sudo systemd-run --scope -p MemoryMax="$MEM_LIMIT" -p AllowedCPUs="$RETH_CPUS" \
env "${SUDO_ENV[@]}" nice -n -20 "$BINARY" "${RETH_ARGS[@]}" \
sudo taskset -c "$RETH_CPUS" nice -n -20 "$BINARY" "${RETH_ARGS[@]}" \
> "$LOG" 2>&1 &
fi
@@ -225,15 +169,6 @@ if [ "$BIG_BLOCKS" = "true" ]; then
GAS_RAMP_COUNT=$(find "$BIG_BLOCKS_DIR/gas-ramp-dir" -name '*.json' | wc -l)
echo "$GAS_RAMP_COUNT" > "$OUTPUT_DIR/gas_ramp_blocks.txt"
echo "Gas ramp blocks: $GAS_RAMP_COUNT"
# Start tracy-capture so profile only covers the benchmark
if [ "${BENCH_TRACY:-off}" != "off" ]; then
echo "Starting tracy-capture..."
tracy-capture -f -o "$OUTPUT_DIR/tracy-profile.tracy" &
TRACY_PID=$!
sleep 0.5 # give tracy-capture time to connect
fi
echo "Running big blocks benchmark (replay-payloads)..."
$BENCH_NICE "$RETH_BENCH" replay-payloads \
"${EXTRA_BENCH_ARGS[@]}" \
@@ -252,14 +187,6 @@ else
--advance "${BENCH_WARMUP_BLOCKS:-50}" \
"${EXTRA_BENCH_ARGS[@]}" 2>&1 | sed -u "s/^/[bench] /"
# Start tracy-capture after warmup so profile only covers the benchmark
if [ "${BENCH_TRACY:-off}" != "off" ]; then
echo "Starting tracy-capture..."
tracy-capture -f -o "$OUTPUT_DIR/tracy-profile.tracy" &
TRACY_PID=$!
sleep 0.5 # give tracy-capture time to connect
fi
# Benchmark
$BENCH_NICE "$RETH_BENCH" new-payload-fcu \
--rpc-url "$BENCH_RPC_URL" \

View File

@@ -422,7 +422,6 @@ def generate_markdown(
summary: dict, comparison_table: str,
wait_time_tables: list[str] | None = None,
behind_baseline: int = 0, repo: str = "", baseline_ref: str = "", baseline_name: str = "",
grafana_url: str | None = None,
) -> str:
"""Generate a markdown comment body."""
lines = ["## Benchmark Results", ""]
@@ -442,9 +441,6 @@ def generate_markdown(
lines.append(table)
lines.append("")
lines.append("</details>")
if grafana_url:
lines.append("")
lines.append(f"**[Grafana Dashboard]({grafana_url})**")
return "\n".join(lines)
@@ -473,7 +469,6 @@ def main():
parser.add_argument("--behind-baseline", "--behind-main", type=int, default=0, help="Commits behind baseline")
parser.add_argument("--big-blocks", action="store_true", default=False, help="Big blocks mode")
parser.add_argument("--gas-ramp-blocks", type=int, default=0, help="Number of gas ramp blocks (big blocks mode)")
parser.add_argument("--grafana-url", default=None, help="Grafana dashboard URL for this benchmark run")
args = parser.parse_args()
if len(args.baseline_csv) != len(args.feature_csv):
@@ -580,7 +575,6 @@ def main():
repo=args.repo,
baseline_ref=baseline_ref,
baseline_name=baseline_name,
grafana_url=args.grafana_url,
)
with open(args.output_markdown, "w") as f:

View File

@@ -7,8 +7,6 @@
// BENCH_PR PR number (may be empty)
// BENCH_ACTOR GitHub user who triggered the bench
// BENCH_JOB_URL URL to the Actions job page
// BENCH_BASELINE_ARGS Extra CLI args for the baseline reth node
// BENCH_FEATURE_ARGS Extra CLI args for the feature reth node
// BENCH_SAMPLY 'true' if samply profiling was enabled
//
// Usage from actions/github-script:
@@ -134,13 +132,7 @@ function buildSuccessBlocks({ summary, prNumber, actor, actorSlackId, jobUrl, re
if (cores !== '0') countsParts.push(`*Cores:* ${cores}`);
const countsLine = countsParts.join(' | ');
const baselineArgs = process.env.BENCH_BASELINE_ARGS || '';
const featureArgs = process.env.BENCH_FEATURE_ARGS || '';
const argsLines = [];
if (baselineArgs) argsLines.push(`*Baseline Args:* \`${baselineArgs}\``);
if (featureArgs) argsLines.push(`*Feature Args:* \`${featureArgs}\``);
const sectionText = [metaParts.join(' | '), '', baselineLine, featureLine, ...argsLines, countsLine].join('\n');
const sectionText = [metaParts.join(' | '), '', baselineLine, featureLine, countsLine].join('\n');
// Action buttons
const diffUrl = `https://github.com/${repo}/compare/${summary.baseline.ref}...${summary.feature.ref}`;

View File

@@ -1,414 +0,0 @@
#!/usr/bin/env bash
#
# Full PGO+BOLT optimized build for reth using real reth-bench workloads.
#
# Phases:
# 1. Build PGO-instrumented reth, run reth-bench → collect PGO profiles
# 2. Build BOLT-instrumented reth (with PGO), run reth-bench → collect BOLT profiles
# 3. Build final PGO+BOLT optimized binary
#
# Required environment variables:
# DATADIR - Path to reth datadir (must already contain chain data)
# RPC_URL - Source RPC URL for reth-bench to fetch payloads from
#
# Optional environment variables:
# PGO_BLOCKS - Number of blocks for PGO profiling (default: 20)
# BOLT_BLOCKS - Number of blocks for BOLT profiling (default: 20)
# SKIP_BOLT - Temporarily skip BOLT phases (default: false)
# STRIP_SYMBOLS - Strip debug symbols from output binary (default: true)
# COLLECT_PGO_ONLY - Stop after producing merged.profdata (default: false)
# PGO_PROFDATA - Path to pre-collected merged.profdata (optional)
# PROFILE - Cargo profile (default: maxperf-symbols)
# FEATURES - Cargo features (default: jemalloc,asm-keccak,min-debug-logs)
# TARGET - Target triple (default: auto-detected)
# EXTRA_RUSTFLAGS - Additional RUSTFLAGS (e.g. -C target-cpu=x86-64-v3)
#
# Output:
# target/$PROFILE_DIR/reth — final optimized binary
set -euo pipefail
gha_section_start() {
local title="$1"
if [ -n "${GITHUB_ACTIONS:-}" ]; then
echo "::group::$title"
else
echo ""
echo "=== $title ==="
fi
}
gha_section_end() {
if [ -n "${GITHUB_ACTIONS:-}" ]; then
echo "::endgroup::"
fi
}
cd "$(dirname "$0")/../.."
# ── Configuration ──────────────────────────────────────────────────────────────
PGO_BLOCKS="${PGO_BLOCKS:-20}"
BOLT_BLOCKS="${BOLT_BLOCKS:-20}"
SKIP_BOLT="${SKIP_BOLT:-false}"
STRIP_SYMBOLS="${STRIP_SYMBOLS:-true}"
COLLECT_PGO_ONLY="${COLLECT_PGO_ONLY:-false}"
PROFILE="${PROFILE:-maxperf-symbols}"
FEATURES="${FEATURES:-jemalloc,asm-keccak,min-debug-logs}"
TARGET="${TARGET:-$(rustc -Vv | grep host | cut -d' ' -f2)}"
BASE_RUSTFLAGS="${RUSTFLAGS:-}"
EXTRA_RUSTFLAGS="${EXTRA_RUSTFLAGS:-}"
COMBINED_RUSTFLAGS="$BASE_RUSTFLAGS $EXTRA_RUSTFLAGS"
PGO_PROFDATA="${PGO_PROFDATA:-}"
DATADIR="${DATADIR:-}"
RPC_URL="${RPC_URL:-}"
SKIP_BOLT_BOOL=false
if [[ "${SKIP_BOLT,,}" == "true" || "$SKIP_BOLT" == "1" ]]; then
SKIP_BOLT_BOOL=true
fi
STRIP_SYMBOLS_BOOL=false
if [[ "${STRIP_SYMBOLS,,}" == "true" || "$STRIP_SYMBOLS" == "1" ]]; then
STRIP_SYMBOLS_BOOL=true
fi
COLLECT_PGO_ONLY_BOOL=false
if [[ "${COLLECT_PGO_ONLY,,}" == "true" || "$COLLECT_PGO_ONLY" == "1" ]]; then
COLLECT_PGO_ONLY_BOOL=true
fi
USE_PRECOLLECTED_PGO=false
if [ -n "$PGO_PROFDATA" ]; then
if [ ! -f "$PGO_PROFDATA" ]; then
echo "error: PGO_PROFDATA points to a missing file: $PGO_PROFDATA"
exit 1
fi
USE_PRECOLLECTED_PGO=true
fi
NEEDS_BENCH_WORKLOAD=true
if [ "$USE_PRECOLLECTED_PGO" = true ] && [ "$SKIP_BOLT_BOOL" = true ]; then
NEEDS_BENCH_WORKLOAD=false
fi
if [ "$NEEDS_BENCH_WORKLOAD" = true ]; then
: "${DATADIR:?DATADIR must be set to the reth data directory}"
: "${RPC_URL:?RPC_URL must be set}"
fi
if [[ "$PROFILE" == dev ]]; then
PROFILE_DIR=debug
else
PROFILE_DIR=$PROFILE
fi
MANIFEST_PATH="bin/reth"
LLVM_VERSION=$(rustc -Vv | grep -oP 'LLVM version: \K\d+')
PGO_DIR="$PWD/target/pgo-profiles"
BOLT_DIR="$PWD/target/bolt-profiles"
CARGO_ARGS=(--profile "$PROFILE" --features "$FEATURES" --manifest-path "$MANIFEST_PATH/Cargo.toml" --bin "reth" --locked)
# Enable debug symbols for BOLT (requires symbols to reorder code).
# Strip them at the end.
PROFILE_UPPER=$(echo "$PROFILE" | tr '[:lower:]-' '[:upper:]_')
export "CARGO_PROFILE_${PROFILE_UPPER}_STRIP=debuginfo"
gha_section_start "Full PGO+BOLT Build"
echo "Binary: reth"
echo "Manifest: $MANIFEST_PATH"
echo "Target: $TARGET"
echo "Profile: $PROFILE"
echo "Features: $FEATURES"
echo "LLVM: $LLVM_VERSION"
echo "PGO blocks: $PGO_BLOCKS"
echo "BOLT blocks: $BOLT_BLOCKS"
echo "Skip BOLT: $SKIP_BOLT"
echo "Strip symbols: $STRIP_SYMBOLS"
echo "Collect only: $COLLECT_PGO_ONLY"
echo "PGO profdata: ${PGO_PROFDATA:-<collect with reth-bench>}"
echo "RUSTFLAGS: ${BASE_RUSTFLAGS:-<unset>}"
echo "EXTRA_RUSTFLAGS: ${EXTRA_RUSTFLAGS:-<unset>}"
if [ "$NEEDS_BENCH_WORKLOAD" = true ]; then
echo "Datadir: $DATADIR"
echo "RPC URL: $RPC_URL"
else
echo "Datadir: <not required>"
echo "RPC URL: <not required>"
fi
gha_section_end
# ── Prerequisites ──────────────────────────────────────────────────────────────
gha_section_start "Installing prerequisites"
rustup component add llvm-tools-preview
LLVM_PROFDATA=$(find "$(rustc --print sysroot)" -name llvm-profdata -type f | head -1)
if [ -z "$LLVM_PROFDATA" ]; then
echo "error: llvm-profdata not found"
exit 1
fi
install_bolt() {
if command -v llvm-bolt &>/dev/null; then
echo "BOLT already installed"
return
fi
echo "Installing BOLT from apt.llvm.org..."
wget -qO- https://apt.llvm.org/llvm-snapshot.gpg.key | sudo tee /etc/apt/trusted.gpg.d/apt.llvm.org.asc >/dev/null
CODENAME=$(lsb_release -cs)
echo "deb http://apt.llvm.org/$CODENAME/ llvm-toolchain-$CODENAME-$LLVM_VERSION main" | sudo tee /etc/apt/sources.list.d/llvm.list >/dev/null
sudo apt-get update -qq
sudo apt-get install -y -qq "bolt-$LLVM_VERSION"
sudo ln -sf "/usr/bin/llvm-bolt-$LLVM_VERSION" /usr/local/bin/llvm-bolt
sudo ln -sf "/usr/bin/merge-fdata-$LLVM_VERSION" /usr/local/bin/merge-fdata
}
if [ "$SKIP_BOLT_BOOL" = true ]; then
echo "Skipping BOLT installation (SKIP_BOLT=$SKIP_BOLT)"
else
install_bolt
fi
gha_section_end
if [ "$NEEDS_BENCH_WORKLOAD" = true ]; then
# Build reth-bench once (non-instrumented) — reused for both phases.
gha_section_start "Building reth-bench"
RUSTFLAGS="$COMBINED_RUSTFLAGS" \
cargo build --profile "$PROFILE" --features "$FEATURES" \
--manifest-path bin/reth-bench/Cargo.toml --bin reth-bench --locked
RETH_BENCH_BIN="$(find target -name reth-bench -type f -executable | head -1)"
echo "reth-bench: $RETH_BENCH_BIN"
gha_section_end
else
gha_section_start "Building reth-bench"
echo "Skipping reth-bench build (pre-collected PGO with SKIP_BOLT=true)"
gha_section_end
fi
# ── Helpers ────────────────────────────────────────────────────────────────────
RETH_PID=
cleanup() {
if [ -n "${RETH_PID:-}" ] && kill -0 "$RETH_PID" 2>/dev/null; then
echo "Stopping reth (pid $RETH_PID)..."
sudo kill "$RETH_PID" 2>/dev/null || true
for i in $(seq 1 60); do
sudo kill -0 "$RETH_PID" 2>/dev/null || break
if [ $((i % 10)) -eq 0 ]; then
echo " waiting... (${i}s)"
fi
sleep 1
done
sudo kill -9 "$RETH_PID" 2>/dev/null || true
fi
}
trap cleanup EXIT
# Start reth, wait for RPC, run reth-bench, then stop reth.
# Arguments: $1 = reth binary path, $2 = number of blocks, $3 = log label
run_bench_workload() {
local reth_bin="$1" blocks="$2" label="$3"
local http_port=8545 authrpc_port=8551
echo "--- Starting reth ($label) ---"
sudo "$reth_bin" node \
--datadir "$DATADIR" \
--log.file.directory "/tmp/reth-${label}-logs" \
--engine.accept-execution-requests-hash \
--http --http.port "$http_port" \
--authrpc.port "$authrpc_port" \
--disable-discovery --no-persist-peers \
> "/tmp/reth-${label}.log" 2>&1 &
RETH_PID=$!
echo "Waiting for reth RPC..."
for i in $(seq 1 120); do
if curl -sf "http://127.0.0.1:$http_port" -X POST \
-H 'Content-Type: application/json' \
-d '{"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":1}' \
> /dev/null 2>&1; then
echo "reth is ready after ${i}s"
break
fi
if [ "$i" -eq 120 ]; then
echo "error: reth failed to start within 120s"
cat "/tmp/reth-${label}.log"
exit 1
fi
sleep 1
done
echo "Running reth-bench ($blocks blocks)..."
"$RETH_BENCH_BIN" new-payload-fcu \
--rpc-url "$RPC_URL" \
--engine-rpc-url "http://127.0.0.1:$authrpc_port" \
--jwt-secret "$DATADIR/jwt.hex" \
--advance "$blocks" \
--reth-new-payload 2>&1 | sed -u "s/^/[$label] /"
echo "Stopping reth ($label)..."
sudo kill "$RETH_PID" 2>/dev/null || true
for i in $(seq 1 60); do
sudo kill -0 "$RETH_PID" 2>/dev/null || break
sleep 1
done
sudo kill -9 "$RETH_PID" 2>/dev/null || true
RETH_PID=
}
publish_binary() {
local source_bin="$1"
for out in "target/$TARGET/$PROFILE_DIR" "target/$PROFILE_DIR"; do
local destination="$out/reth"
mkdir -p "$out"
# Skip copying when source and destination resolve to the same inode.
if [ -e "$destination" ] && [ "$source_bin" -ef "$destination" ]; then
continue
fi
cp "$source_bin" "$destination"
done
}
if [ "$USE_PRECOLLECTED_PGO" = true ]; then
gha_section_start "Phase 1: Using Pre-Collected PGO Profile"
rm -rf "$PGO_DIR"
mkdir -p "$PGO_DIR"
cp "$PGO_PROFDATA" "$PGO_DIR/merged.profdata"
echo "Using pre-collected profile: $PGO_PROFDATA"
echo "PGO profile: $PGO_DIR/merged.profdata ($(ls -lh "$PGO_DIR/merged.profdata" | awk '{print $5}'))"
gha_section_end
else
# ── Phase 1: PGO profile collection ───────────────────────────────────────
gha_section_start "Phase 1: PGO Profile Collection"
rm -rf "$PGO_DIR"
mkdir -p "$PGO_DIR"
echo "Building PGO-instrumented binary..."
RUSTFLAGS="-Cprofile-generate=$PGO_DIR -Crelocation-model=pic $COMBINED_RUSTFLAGS" \
cargo build "${CARGO_ARGS[@]}" --target "$TARGET"
PGO_RETH_BIN="$PWD/target/$TARGET/$PROFILE_DIR/reth"
echo "Instrumented binary: $PGO_RETH_BIN ($(ls -lh "$PGO_RETH_BIN" | awk '{print $5}'))"
run_bench_workload "$PGO_RETH_BIN" "$PGO_BLOCKS" "pgo"
# Fix ownership if reth ran as root.
sudo chown -R "$(id -un):$(id -gn)" "$PGO_DIR" 2>/dev/null || true
# Merge PGO profiles.
echo "Merging PGO profiles..."
PROFRAW_COUNT=$(find "$PGO_DIR" -name '*.profraw' | wc -l)
echo "Found $PROFRAW_COUNT .profraw files"
if [ "$PROFRAW_COUNT" -eq 0 ]; then
echo "error: no .profraw files — instrumented binary did not produce profiles"
exit 1
fi
"$LLVM_PROFDATA" merge -o "$PGO_DIR/merged.profdata" "$PGO_DIR"/*.profraw
echo "PGO profile: $PGO_DIR/merged.profdata ($(ls -lh "$PGO_DIR/merged.profdata" | awk '{print $5}'))"
gha_section_end
fi
if [ "$COLLECT_PGO_ONLY_BOOL" = true ]; then
gha_section_start "PGO Collection Complete"
echo "COLLECT_PGO_ONLY=true, skipping PGO/BOLT optimized binary build"
echo "Profile: $PGO_DIR/merged.profdata"
gha_section_end
exit 0
fi
if [ "$SKIP_BOLT_BOOL" = true ]; then
gha_section_start "BOLT Phase Skipped"
echo "SKIP_BOLT=$SKIP_BOLT, building PGO-only binary"
echo "Building PGO-optimized binary..."
RUSTFLAGS="-Cprofile-use=$PGO_DIR/merged.profdata $COMBINED_RUSTFLAGS" \
cargo build "${CARGO_ARGS[@]}" --target "$TARGET"
BUILT_BIN="$PWD/target/$TARGET/$PROFILE_DIR/reth"
if [ "$STRIP_SYMBOLS_BOOL" = true ]; then
echo "Stripping debug symbols..."
strip "$BUILT_BIN"
else
echo "Skipping strip (STRIP_SYMBOLS=$STRIP_SYMBOLS)"
fi
publish_binary "$BUILT_BIN"
gha_section_end
else
# ── Phase 2: BOLT profile collection (with PGO) ──────────────────────────
gha_section_start "Phase 2: BOLT Profile Collection (with PGO)"
rm -rf "$BOLT_DIR"
mkdir -p "$BOLT_DIR"
echo "Building BOLT-instrumented binary with PGO..."
# --emit-relocs preserves relocation entries in the binary, required by llvm-bolt -instrument
RUSTFLAGS="-Cprofile-use=$PGO_DIR/merged.profdata -Clink-arg=-Wl,--emit-relocs $COMBINED_RUSTFLAGS" \
cargo build "${CARGO_ARGS[@]}" --target "$TARGET"
# Instrument with BOLT
BUILT_BIN="$PWD/target/$TARGET/$PROFILE_DIR/reth"
BOLT_INSTRUMENTED_BIN="$BUILT_BIN-bolt-instrumented"
echo "Instrumenting binary with BOLT..."
# --skip-funcs: skip compiler-generated drop_in_place functions that BOLT can't handle
# as split functions in relocation mode (triggered by --emit-relocs)
llvm-bolt "$BUILT_BIN" \
-instrument \
--instrumentation-file-append-pid \
--instrumentation-file="$BOLT_DIR/prof" \
--skip-funcs='.*drop_in_place.*' \
-o "$BOLT_INSTRUMENTED_BIN"
echo "BOLT-instrumented binary: $BOLT_INSTRUMENTED_BIN ($(ls -lh "$BOLT_INSTRUMENTED_BIN" | awk '{print $5}'))"
run_bench_workload "$BOLT_INSTRUMENTED_BIN" "$BOLT_BLOCKS" "bolt"
# Fix ownership for BOLT profiles
sudo chown -R "$(id -un):$(id -gn)" "$BOLT_DIR" 2>/dev/null || true
# Merge BOLT profiles
echo "Merging BOLT profiles..."
FDATA_COUNT=$(find "$BOLT_DIR" -name '*.fdata' | wc -l)
echo "Found $FDATA_COUNT .fdata files"
if [ "$FDATA_COUNT" -eq 0 ]; then
echo "error: no .fdata files — BOLT-instrumented binary did not produce profiles"
exit 1
fi
merge-fdata "$BOLT_DIR"/*.fdata > "$BOLT_DIR/merged.fdata"
echo "BOLT profile: $BOLT_DIR/merged.fdata ($(ls -lh "$BOLT_DIR/merged.fdata" | awk '{print $5}'))"
gha_section_end
# ── Phase 3: Final optimized build ───────────────────────────────────────
gha_section_start "Phase 3: Final PGO+BOLT Optimized Build"
echo "Building PGO-optimized binary..."
# --emit-relocs preserves relocation entries in the binary, required by llvm-bolt for code reordering
RUSTFLAGS="-Cprofile-use=$PGO_DIR/merged.profdata -Clink-arg=-Wl,--emit-relocs $COMBINED_RUSTFLAGS" \
cargo build "${CARGO_ARGS[@]}" --target "$TARGET"
BUILT_BIN="$PWD/target/$TARGET/$PROFILE_DIR/reth"
OPTIMIZED_BIN="$BUILT_BIN-bolt-optimized"
echo "Optimizing with BOLT..."
llvm-bolt "$BUILT_BIN" \
-o "$OPTIMIZED_BIN" \
--data "$BOLT_DIR/merged.fdata" \
-reorder-blocks=ext-tsp \
-reorder-functions=cdsort \
-split-functions \
-split-all-cold \
-dyno-stats \
-icf=1 \
-use-gnu-stack \
--skip-funcs='.*drop_in_place.*'
if [ "$STRIP_SYMBOLS_BOOL" = true ]; then
echo "Stripping debug symbols..."
strip "$OPTIMIZED_BIN"
else
echo "Skipping strip (STRIP_SYMBOLS=$STRIP_SYMBOLS)"
fi
publish_binary "$OPTIMIZED_BIN"
gha_section_end
fi
gha_section_start "Build Complete"
ls -lh "target/$PROFILE_DIR/reth"
echo "Output: target/$PROFILE_DIR/reth"
gha_section_end

View File

@@ -8,7 +8,7 @@
on:
issue_comment:
types: [created]
types: [created, edited]
workflow_dispatch:
inputs:
blocks:
@@ -31,6 +31,21 @@ on:
required: false
default: ""
type: string
samply:
description: "Enable samply profiling"
required: false
default: "false"
type: boolean
cores:
description: "Limit reth to N CPU cores (0 = all available)"
required: false
default: "0"
type: string
reth_newPayload:
description: "Use reth_newPayload RPC (server-side timing)"
required: false
default: "true"
type: boolean
wait_time:
description: "Fixed wait time between blocks (e.g. 500ms, 1s)"
required: false
@@ -46,31 +61,6 @@ on:
required: false
default: ""
type: string
samply:
description: "Enable samply profiling"
required: false
default: "false"
type: boolean
reth_newPayload:
description: "Use reth_newPayload RPC (server-side timing)"
required: false
default: "true"
type: boolean
cores:
description: "Limit reth to N CPU cores (0 = all available)"
required: false
default: "0"
type: string
no_slack:
description: "Suppress Slack notifications for benchmark results"
required: false
default: "true"
type: boolean
abba:
description: "Run ABBA (BFFB) interleaved order; false = single AB pass"
required: false
default: "true"
type: boolean
env:
CARGO_TERM_COLOR: always
@@ -102,14 +92,12 @@ jobs:
baseline-name: ${{ steps.args.outputs.baseline-name }}
feature-name: ${{ steps.args.outputs.feature-name }}
samply: ${{ steps.args.outputs.samply }}
no-slack: ${{ steps.args.outputs.no-slack }}
cores: ${{ steps.args.outputs.cores }}
big-blocks: ${{ steps.args.outputs.big-blocks }}
reth-new-payload: ${{ steps.args.outputs.reth-new-payload }}
wait-time: ${{ steps.args.outputs.wait-time }}
baseline-args: ${{ steps.args.outputs.baseline-args }}
feature-args: ${{ steps.args.outputs.feature-args }}
abba: ${{ steps.args.outputs.abba }}
comment-id: ${{ steps.ack.outputs.comment-id }}
steps:
- name: Check org membership
@@ -146,11 +134,9 @@ jobs:
baseline = '${{ github.event.inputs.baseline }}';
feature = '${{ github.event.inputs.feature }}';
samply = '${{ github.event.inputs.samply }}' === 'true' ? 'true' : 'false';
var noSlack = '${{ github.event.inputs.no_slack }}' !== 'false' ? 'true' : 'false';
cores = '${{ github.event.inputs.cores }}' || '0';
bigBlocks = blocks === 'big' ? 'true' : 'false';
var rethNewPayload = '${{ github.event.inputs.reth_newPayload }}' !== 'false' ? 'true' : 'false';
var abba = '${{ github.event.inputs.abba }}' !== 'false' ? 'true' : 'false';
var waitTime = '${{ github.event.inputs.wait_time }}' || '';
var baselineNodeArgs = '${{ github.event.inputs.baseline_args }}' || '';
var featureNodeArgs = '${{ github.event.inputs.feature_args }}' || '';
@@ -176,11 +162,11 @@ jobs:
const intArgs = new Set(['warmup', 'cores']);
const intOrKeywordArgs = new Map([['blocks', new Set(['big'])]]);
const refArgs = new Set(['baseline', 'feature']);
const boolArgs = new Set(['samply', 'no-slack']);
const boolDefaultTrue = new Set(['reth_newPayload', 'abba']);
const boolArgs = new Set(['samply']);
const boolDefaultTrue = new Set(['reth_newPayload']);
const durationArgs = new Set(['wait-time']);
const stringArgs = new Set(['baseline-args', 'feature-args']);
const defaults = { blocks: '500', warmup: '100', baseline: '', feature: '', samply: 'false', 'no-slack': 'false', cores: '0', reth_newPayload: 'true', abba: 'true', 'wait-time': '', 'baseline-args': '', 'feature-args': '' };
const defaults = { blocks: '500', warmup: '100', baseline: '', feature: '', samply: 'false', cores: '0', reth_newPayload: 'true', 'wait-time': '', 'baseline-args': '', 'feature-args': '' };
const unknown = [];
const invalid = [];
const args = body.replace(/^(?:@decofe|derek) bench\s*/, '');
@@ -250,7 +236,7 @@ jobs:
if (unknown.length) errors.push(`Unknown argument(s): \`${unknown.join('`, `')}\``);
if (invalid.length) errors.push(`Invalid value(s): ${invalid.join(', ')}`);
if (errors.length) {
const msg = `❌ **Invalid bench command**\n\n${errors.join('\n')}\n\n**Usage:** \`@decofe bench [blocks=N|big] [warmup=N] [baseline=REF] [feature=REF] [samply] [no-slack] [cores=N] [reth_newPayload=true|false] [abba=true|false] [wait-time=DURATION] [baseline-args="..."] [feature-args="..."]\``;
const msg = `❌ **Invalid bench command**\n\n${errors.join('\n')}\n\n**Usage:** \`@decofe bench [blocks=N|big] [warmup=N] [baseline=REF] [feature=REF] [samply] [cores=N] [reth_newPayload=true|false] [wait-time=DURATION] [baseline-args="..."] [feature-args="..."]\``;
await github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
@@ -265,11 +251,9 @@ jobs:
baseline = defaults.baseline;
feature = defaults.feature;
samply = defaults.samply;
var noSlack = defaults['no-slack'];
cores = defaults.cores;
bigBlocks = blocks === 'big' ? 'true' : 'false';
var rethNewPayload = defaults.reth_newPayload;
var abba = defaults.abba;
var waitTime = defaults['wait-time'];
var baselineNodeArgs = defaults['baseline-args'];
var featureNodeArgs = defaults['feature-args'];
@@ -300,14 +284,12 @@ jobs:
core.setOutput('baseline-name', baselineName);
core.setOutput('feature-name', featureName);
core.setOutput('samply', samply);
core.setOutput('no-slack', noSlack);
core.setOutput('cores', cores);
core.setOutput('big-blocks', bigBlocks);
core.setOutput('reth-new-payload', rethNewPayload);
core.setOutput('wait-time', waitTime);
core.setOutput('baseline-args', baselineNodeArgs);
core.setOutput('feature-args', featureNodeArgs);
core.setOutput('abba', abba);
- name: Acknowledge request
id: ack
@@ -365,16 +347,12 @@ jobs:
const baseline = '${{ steps.args.outputs.baseline-name }}';
const feature = '${{ steps.args.outputs.feature-name }}';
const samply = '${{ steps.args.outputs.samply }}' === 'true';
const noSlack = '${{ steps.args.outputs.no-slack }}' === 'true';
const bigBlocks = '${{ steps.args.outputs.big-blocks }}' === 'true';
const samplyNote = samply ? ', samply: `enabled`' : '';
const noSlackNote = noSlack ? ', no-slack' : '';
const cores = '${{ steps.args.outputs.cores }}';
const coresNote = cores && cores !== '0' ? `, cores: \`${cores}\`` : '';
const rethNP = '${{ steps.args.outputs.reth-new-payload }}' !== 'false';
const rethNPNote = !rethNP ? ', reth_newPayload: `disabled`' : '';
const abbaEnabled = '${{ steps.args.outputs.abba }}' !== 'false';
const abbaNote = !abbaEnabled ? ', abba: `disabled`' : '';
const waitTimeVal = '${{ steps.args.outputs.wait-time }}';
const waitTimeNote = waitTimeVal ? `, wait-time: \`${waitTimeVal}\`` : '';
const baselineArgsVal = '${{ steps.args.outputs.baseline-args }}';
@@ -382,7 +360,7 @@ jobs:
const featureArgsVal = '${{ steps.args.outputs.feature-args }}';
const featureArgsNote = featureArgsVal ? `, feature-args: \`${featureArgsVal}\`` : '';
const blocksDesc = bigBlocks ? 'blocks: `big`' : `${blocks} blocks, ${warmup} warmup blocks`;
const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${noSlackNote}${coresNote}${rethNPNote}${abbaNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`;
const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${coresNote}${rethNPNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`;
const { data: comment } = await github.rest.issues.createComment({
owner: context.repo.owner,
@@ -407,16 +385,12 @@ jobs:
const baseline = '${{ steps.args.outputs.baseline-name }}';
const feature = '${{ steps.args.outputs.feature-name }}';
const samply = '${{ steps.args.outputs.samply }}' === 'true';
const noSlack = '${{ steps.args.outputs.no-slack }}' === 'true';
const bigBlocks = '${{ steps.args.outputs.big-blocks }}' === 'true';
const samplyNote = samply ? ', samply: `enabled`' : '';
const noSlackNote = noSlack ? ', no-slack' : '';
const cores = '${{ steps.args.outputs.cores }}';
const coresNote = cores && cores !== '0' ? `, cores: \`${cores}\`` : '';
const rethNP = '${{ steps.args.outputs.reth-new-payload }}' !== 'false';
const rethNPNote = !rethNP ? ', reth_newPayload: `disabled`' : '';
const abbaEnabled = '${{ steps.args.outputs.abba }}' !== 'false';
const abbaNote = !abbaEnabled ? ', abba: `disabled`' : '';
const waitTimeVal = '${{ steps.args.outputs.wait-time }}';
const waitTimeNote = waitTimeVal ? `, wait-time: \`${waitTimeVal}\`` : '';
const baselineArgsVal = '${{ steps.args.outputs.baseline-args }}';
@@ -424,7 +398,7 @@ jobs:
const featureArgsVal = '${{ steps.args.outputs.feature-args }}';
const featureArgsNote = featureArgsVal ? `, feature-args: \`${featureArgsVal}\`` : '';
const blocksDesc = bigBlocks ? 'blocks: `big`' : `${blocks} blocks, ${warmup} warmup blocks`;
const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${noSlackNote}${coresNote}${rethNPNote}${abbaNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`;
const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${coresNote}${rethNPNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`;
const runUrl = `${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/actions/runs/${context.runId}`;
const numRunners = parseInt(process.env.BENCH_RUNNERS) || 1;
@@ -476,7 +450,7 @@ jobs:
reth-bench:
needs: reth-bench-ack
name: reth-bench
runs-on: [self-hosted, Linux, X64, available]
runs-on: [self-hosted, Linux, X64]
timeout-minutes: 120
env:
BENCH_RPC_URL: https://ethereum.reth.rs/rpc
@@ -493,12 +467,7 @@ jobs:
BENCH_WAIT_TIME: ${{ needs.reth-bench-ack.outputs.wait-time }}
BENCH_BASELINE_ARGS: ${{ needs.reth-bench-ack.outputs.baseline-args }}
BENCH_FEATURE_ARGS: ${{ needs.reth-bench-ack.outputs.feature-args }}
BENCH_ABBA: ${{ needs.reth-bench-ack.outputs.abba }}
BENCH_COMMENT_ID: ${{ needs.reth-bench-ack.outputs.comment-id }}
BENCH_NO_SLACK: ${{ needs.reth-bench-ack.outputs.no-slack }}
BENCH_METRICS_ADDR: "127.0.0.1:9100"
BENCH_OTLP_TRACES_ENDPOINT: ${{ secrets.BENCH_OTLP_TRACES_ENDPOINT }}
BENCH_OTLP_LOGS_ENDPOINT: ${{ secrets.BENCH_OTLP_LOGS_ENDPOINT }}
steps:
- name: Clean up previous bench-work
run: sudo rm -rf "$BENCH_WORK_DIR" 2>/dev/null || true
@@ -549,16 +518,12 @@ jobs:
const baseline = '${{ needs.reth-bench-ack.outputs.baseline-name }}';
const feature = '${{ needs.reth-bench-ack.outputs.feature-name }}';
const samply = process.env.BENCH_SAMPLY === 'true';
const noSlack = process.env.BENCH_NO_SLACK === 'true';
const bigBlocks = process.env.BENCH_BIG_BLOCKS === 'true';
const samplyNote = samply ? ', samply: `enabled`' : '';
const noSlackNote = noSlack ? ', no-slack' : '';
const cores = process.env.BENCH_CORES || '0';
const coresNote = cores && cores !== '0' ? `, cores: \`${cores}\`` : '';
const rethNP = (process.env.BENCH_RETH_NEW_PAYLOAD || 'true') !== 'false';
const rethNPNote = !rethNP ? ', reth_newPayload: `disabled`' : '';
const abbaEnabled = (process.env.BENCH_ABBA || 'true') !== 'false';
const abbaNote = !abbaEnabled ? ', abba: `disabled`' : '';
const waitTimeVal = process.env.BENCH_WAIT_TIME || '';
const waitTimeNote = waitTimeVal ? `, wait-time: \`${waitTimeVal}\`` : '';
const baselineArgsVal = process.env.BENCH_BASELINE_ARGS || '';
@@ -566,7 +531,7 @@ jobs:
const featureArgsVal = process.env.BENCH_FEATURE_ARGS || '';
const featureArgsNote = featureArgsVal ? `, feature-args: \`${featureArgsVal}\`` : '';
const blocksDesc = bigBlocks ? 'blocks: `big`' : `${blocks} blocks, ${warmup} warmup blocks`;
core.exportVariable('BENCH_CONFIG', `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${noSlackNote}${coresNote}${rethNPNote}${abbaNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`);
core.exportVariable('BENCH_CONFIG', `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${coresNote}${rethNPNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`);
const { buildBody } = require('./.github/scripts/bench-update-status.js');
await github.rest.issues.updateComment({
@@ -845,26 +810,6 @@ jobs:
fi
echo "Payload files: $(find "$BIG_BLOCKS_DIR/payloads" -name '*.json' | wc -l)"
- name: Start metrics proxy
run: |
BENCH_ID="ci-${{ github.run_id }}"
BENCH_REFERENCE_EPOCH=$(date +%s)
echo "BENCH_ID=${BENCH_ID}" >> "$GITHUB_ENV"
echo "BENCH_REFERENCE_EPOCH=${BENCH_REFERENCE_EPOCH}" >> "$GITHUB_ENV"
LABELS_FILE="/tmp/bench-metrics-labels.json"
echo '{}' > "$LABELS_FILE"
echo "BENCH_LABELS_FILE=${LABELS_FILE}" >> "$GITHUB_ENV"
python3 .github/scripts/bench-metrics-proxy.py \
--labels "$LABELS_FILE" \
--upstream "http://${BENCH_METRICS_ADDR}/" \
--subnet 10.10.0.0/24 \
--port 9090 &
PROXY_PID=$!
echo "BENCH_METRICS_PROXY_PID=${PROXY_PID}" >> "$GITHUB_ENV"
echo "Metrics proxy started (PID $PROXY_PID)"
- name: Update status (running benchmarks)
if: success() && env.BENCH_COMMENT_ID
uses: actions/github-script@v8
@@ -878,64 +823,19 @@ jobs:
# thermal drift and cache warming.
- name: "Run benchmark: baseline (1/2)"
id: run-baseline-1
env:
BASELINE_REF: ${{ steps.refs.outputs.baseline-ref }}
OTEL_RESOURCE_ATTRIBUTES: "benchmark_id=${{ env.BENCH_ID }},benchmark_run=baseline-1,run_type=baseline,git_ref=${{ steps.refs.outputs.baseline-ref }}"
run: |
cat > "$BENCH_LABELS_FILE" <<LABELS
{"benchmark_run":"baseline-1","run_type":"baseline","git_ref":"${BASELINE_REF}","bench_sha":"${BASELINE_REF}","benchmark_id":"${BENCH_ID}","run_start_epoch":"$(date +%s)","reference_epoch":"${BENCH_REFERENCE_EPOCH}"}
LABELS
taskset -c 0 .github/scripts/bench-reth-run.sh baseline ../reth-baseline/target/profiling/reth "$BENCH_WORK_DIR/baseline-1"
run: taskset -c 0 .github/scripts/bench-reth-run.sh baseline ../reth-baseline/target/profiling/reth "$BENCH_WORK_DIR/baseline-1"
- name: "Run benchmark: feature (1/2)"
id: run-feature-1
env:
FEATURE_REF: ${{ steps.refs.outputs.feature-ref }}
OTEL_RESOURCE_ATTRIBUTES: "benchmark_id=${{ env.BENCH_ID }},benchmark_run=feature-1,run_type=feature,git_ref=${{ steps.refs.outputs.feature-ref }}"
run: |
cat > "$BENCH_LABELS_FILE" <<LABELS
{"benchmark_run":"feature-1","run_type":"feature","git_ref":"${FEATURE_REF}","bench_sha":"${FEATURE_REF}","benchmark_id":"${BENCH_ID}","run_start_epoch":"$(date +%s)","reference_epoch":"${BENCH_REFERENCE_EPOCH}"}
LABELS
taskset -c 0 .github/scripts/bench-reth-run.sh feature ../reth-feature/target/profiling/reth "$BENCH_WORK_DIR/feature-1"
run: taskset -c 0 .github/scripts/bench-reth-run.sh feature ../reth-feature/target/profiling/reth "$BENCH_WORK_DIR/feature-1"
- name: "Run benchmark: feature (2/2)"
if: env.BENCH_ABBA != 'false'
id: run-feature-2
env:
FEATURE_REF: ${{ steps.refs.outputs.feature-ref }}
OTEL_RESOURCE_ATTRIBUTES: "benchmark_id=${{ env.BENCH_ID }},benchmark_run=feature-2,run_type=feature,git_ref=${{ steps.refs.outputs.feature-ref }}"
run: |
cat > "$BENCH_LABELS_FILE" <<LABELS
{"benchmark_run":"feature-2","run_type":"feature","git_ref":"${FEATURE_REF}","bench_sha":"${FEATURE_REF}","benchmark_id":"${BENCH_ID}","run_start_epoch":"$(date +%s)","reference_epoch":"${BENCH_REFERENCE_EPOCH}"}
LABELS
taskset -c 0 .github/scripts/bench-reth-run.sh feature ../reth-feature/target/profiling/reth "$BENCH_WORK_DIR/feature-2"
run: taskset -c 0 .github/scripts/bench-reth-run.sh feature ../reth-feature/target/profiling/reth "$BENCH_WORK_DIR/feature-2"
- name: "Run benchmark: baseline (2/2)"
if: env.BENCH_ABBA != 'false'
id: run-baseline-2
env:
BASELINE_REF: ${{ steps.refs.outputs.baseline-ref }}
OTEL_RESOURCE_ATTRIBUTES: "benchmark_id=${{ env.BENCH_ID }},benchmark_run=baseline-2,run_type=baseline,git_ref=${{ steps.refs.outputs.baseline-ref }}"
run: |
LAST_RUN_START=$(date +%s)
echo "BENCH_LAST_RUN_START=${LAST_RUN_START}" >> "$GITHUB_ENV"
cat > "$BENCH_LABELS_FILE" <<LABELS
{"benchmark_run":"baseline-2","run_type":"baseline","git_ref":"${BASELINE_REF}","bench_sha":"${BASELINE_REF}","benchmark_id":"${BENCH_ID}","run_start_epoch":"${LAST_RUN_START}","reference_epoch":"${BENCH_REFERENCE_EPOCH}"}
LABELS
taskset -c 0 .github/scripts/bench-reth-run.sh baseline ../reth-baseline/target/profiling/reth "$BENCH_WORK_DIR/baseline-2"
- name: Stop metrics proxy & generate Grafana URL
id: metrics
if: "!cancelled()"
run: |
kill "$BENCH_METRICS_PROXY_PID" 2>/dev/null || true
LAST_RUN_DURATION=$(( $(date +%s) - BENCH_LAST_RUN_START ))
FROM_MS=$(( BENCH_REFERENCE_EPOCH * 1000 ))
TO_MS=$(( (BENCH_REFERENCE_EPOCH + LAST_RUN_DURATION) * 1000 ))
GRAFANA_URL="https://tempoxyz.grafana.net/d/reth-bench-ghr/reth-bench-ghr?orgId=1&from=${FROM_MS}&to=${TO_MS}&timezone=browser&var-datasource=ef57fux92e9z4e&var-job=reth-bench&var-benchmark_id=${BENCH_ID}&var-benchmark_run=\$__all"
echo "grafana-url=${GRAFANA_URL}" >> "$GITHUB_OUTPUT"
echo "Grafana URL: ${GRAFANA_URL}"
run: taskset -c 0 .github/scripts/bench-reth-run.sh baseline ../reth-baseline/target/profiling/reth "$BENCH_WORK_DIR/baseline-2"
- name: Scan logs for errors
if: "!cancelled()"
@@ -1037,14 +937,8 @@ jobs:
SUMMARY_ARGS="$SUMMARY_ARGS --baseline-name ${BASELINE_NAME}"
SUMMARY_ARGS="$SUMMARY_ARGS --feature-name ${FEATURE_NAME}"
SUMMARY_ARGS="$SUMMARY_ARGS --feature-ref ${FEATURE_REF}"
BASELINE_CSVS="$BENCH_WORK_DIR/baseline-1/combined_latency.csv"
FEATURE_CSVS="$BENCH_WORK_DIR/feature-1/combined_latency.csv"
if [ "${BENCH_ABBA:-true}" = "true" ]; then
BASELINE_CSVS="$BASELINE_CSVS $BENCH_WORK_DIR/baseline-2/combined_latency.csv"
FEATURE_CSVS="$FEATURE_CSVS $BENCH_WORK_DIR/feature-2/combined_latency.csv"
fi
SUMMARY_ARGS="$SUMMARY_ARGS --baseline-csv $BASELINE_CSVS"
SUMMARY_ARGS="$SUMMARY_ARGS --feature-csv $FEATURE_CSVS"
SUMMARY_ARGS="$SUMMARY_ARGS --baseline-csv $BENCH_WORK_DIR/baseline-1/combined_latency.csv $BENCH_WORK_DIR/baseline-2/combined_latency.csv"
SUMMARY_ARGS="$SUMMARY_ARGS --feature-csv $BENCH_WORK_DIR/feature-1/combined_latency.csv $BENCH_WORK_DIR/feature-2/combined_latency.csv"
SUMMARY_ARGS="$SUMMARY_ARGS --gas-csv $BENCH_WORK_DIR/feature-1/total_gas.csv"
if [ "$BEHIND_BASELINE" -gt 0 ]; then
SUMMARY_ARGS="$SUMMARY_ARGS --behind-baseline $BEHIND_BASELINE"
@@ -1057,10 +951,6 @@ jobs:
SUMMARY_ARGS="$SUMMARY_ARGS --gas-ramp-blocks $(cat "$GAS_RAMP_FILE" | tr -d '[:space:]')"
fi
fi
GRAFANA_URL='${{ steps.metrics.outputs.grafana-url }}'
if [ -n "$GRAFANA_URL" ]; then
SUMMARY_ARGS="$SUMMARY_ARGS --grafana-url $GRAFANA_URL"
fi
# shellcheck disable=SC2086
python3 .github/scripts/bench-reth-summary.py $SUMMARY_ARGS
@@ -1071,14 +961,8 @@ jobs:
FEATURE_NAME: ${{ steps.refs.outputs.feature-name }}
run: |
CHART_ARGS="--output-dir $BENCH_WORK_DIR/charts"
FEATURE_CSVS="$BENCH_WORK_DIR/feature-1/combined_latency.csv"
BASELINE_CSVS="$BENCH_WORK_DIR/baseline-1/combined_latency.csv"
if [ "${BENCH_ABBA:-true}" = "true" ]; then
FEATURE_CSVS="$FEATURE_CSVS $BENCH_WORK_DIR/feature-2/combined_latency.csv"
BASELINE_CSVS="$BASELINE_CSVS $BENCH_WORK_DIR/baseline-2/combined_latency.csv"
fi
CHART_ARGS="$CHART_ARGS --feature $FEATURE_CSVS"
CHART_ARGS="$CHART_ARGS --baseline $BASELINE_CSVS"
CHART_ARGS="$CHART_ARGS --feature $BENCH_WORK_DIR/feature-1/combined_latency.csv $BENCH_WORK_DIR/feature-2/combined_latency.csv"
CHART_ARGS="$CHART_ARGS --baseline $BENCH_WORK_DIR/baseline-1/combined_latency.csv $BENCH_WORK_DIR/baseline-2/combined_latency.csv"
CHART_ARGS="$CHART_ARGS --baseline-name ${BASELINE_NAME}"
CHART_ARGS="$CHART_ARGS --feature-name ${FEATURE_NAME}"
# shellcheck disable=SC2086
@@ -1118,7 +1002,7 @@ jobs:
rm -rf "${TMP_DIR}"
- name: Compare & comment
if: success() && env.BENCH_COMMENT_ID
if: success()
uses: actions/github-script@v8
with:
github-token: ${{ secrets.DEREK_PAT }}
@@ -1154,8 +1038,7 @@ jobs:
// Samply profile links (URLs point directly to Firefox Profiler)
if (process.env.BENCH_SAMPLY === 'true') {
const abba = (process.env.BENCH_ABBA || 'true') !== 'false';
const runs = abba ? ['baseline-1', 'feature-1', 'feature-2', 'baseline-2'] : ['baseline-1', 'feature-1'];
const runs = ['baseline-1', 'feature-1', 'feature-2', 'baseline-2'];
const links = [];
for (const run of runs) {
try {
@@ -1170,12 +1053,6 @@ jobs:
}
}
// Grafana dashboard link
const grafanaUrl = '${{ steps.metrics.outputs.grafana-url }}';
if (grafanaUrl) {
comment += `\n\n### Grafana Dashboard\n\n[View real-time metrics](${grafanaUrl})\n`;
}
// Node errors (panics / ERROR logs)
try {
const errors = fs.readFileSync(process.env.BENCH_WORK_DIR + '/errors.md', 'utf8');
@@ -1201,7 +1078,7 @@ jobs:
}
- name: Send Slack notification (success)
if: success() && env.BENCH_NO_SLACK != 'true'
if: success()
uses: actions/github-script@v8
env:
SLACK_BENCH_BOT_TOKEN: ${{ secrets.SLACK_BENCH_BOT_TOKEN }}
@@ -1217,13 +1094,12 @@ jobs:
with:
github-token: ${{ secrets.DEREK_PAT }}
script: |
const abba = (process.env.BENCH_ABBA || 'true') !== 'false';
const steps_status = [
['building binaries${{ steps.snapshot-check.outputs.needed == 'true' && ' & downloading snapshot' || '' }}', '${{ steps.build.outcome }}'],
['running baseline benchmark (1/2)', '${{ steps.run-baseline-1.outcome }}'],
['running feature benchmark (1/2)', '${{ steps.run-feature-1.outcome }}'],
...(abba ? [['running feature benchmark (2/2)', '${{ steps.run-feature-2.outcome }}']] : []),
...(abba ? [['running baseline benchmark (2/2)', '${{ steps.run-baseline-2.outcome }}']] : []),
['running feature benchmark (2/2)', '${{ steps.run-feature-2.outcome }}'],
['running baseline benchmark (2/2)', '${{ steps.run-baseline-2.outcome }}'],
];
const failed = steps_status.find(([, o]) => o === 'failure');
const failedStep = failed ? failed[0] : 'unknown step';
@@ -1252,13 +1128,12 @@ jobs:
SLACK_BENCH_CHANNEL: ${{ secrets.SLACK_BENCH_CHANNEL }}
with:
script: |
const abba = (process.env.BENCH_ABBA || 'true') !== 'false';
const steps_status = [
['building binaries${{ steps.snapshot-check.outputs.needed == 'true' && ' & downloading snapshot' || '' }}', '${{ steps.build.outcome }}'],
['running baseline benchmark (1/2)', '${{ steps.run-baseline-1.outcome }}'],
['running feature benchmark (1/2)', '${{ steps.run-feature-1.outcome }}'],
...(abba ? [['running feature benchmark (2/2)', '${{ steps.run-feature-2.outcome }}']] : []),
...(abba ? [['running baseline benchmark (2/2)', '${{ steps.run-baseline-2.outcome }}']] : []),
['running feature benchmark (2/2)', '${{ steps.run-feature-2.outcome }}'],
['running baseline benchmark (2/2)', '${{ steps.run-baseline-2.outcome }}'],
];
const failed = steps_status.find(([, o]) => o === 'failure');
const failedStep = failed ? failed[0] : 'unknown step';

View File

@@ -6,7 +6,7 @@ on:
hive_target:
required: true
type: string
description: "Docker bake target to build (e.g. hive)"
description: "Docker bake target to build (e.g. hive-stable, hive-edge)"
artifact_name:
required: false
type: string

View File

@@ -28,30 +28,12 @@ on:
required: false
type: boolean
default: false
pgo:
description: "Enable PGO profiling"
required: false
type: boolean
default: false
pgo_blocks:
description: "Number of blocks to execute for PGO profiling"
required: false
type: string
default: "20"
jobs:
collect-pgo-profile:
if: github.repository == 'paradigmxyz/reth' && github.event_name == 'workflow_dispatch' && inputs.pgo
uses: ./.github/workflows/pgo-profile.yml
with:
pgo_blocks: ${{ inputs.pgo_blocks || '20' }}
secrets: inherit
build:
if: github.repository == 'paradigmxyz/reth' && !failure() && !cancelled()
if: github.repository == 'paradigmxyz/reth'
name: Build Docker images
runs-on: ubuntu-24.04
needs: collect-pgo-profile
permissions:
packages: write
contents: read
@@ -76,30 +58,6 @@ jobs:
echo "describe=$(git describe --always --tags)" >> "$GITHUB_OUTPUT"
echo "dirty=false" >> "$GITHUB_OUTPUT"
- name: Download pre-collected PGO profile
if: ${{ github.event_name == 'workflow_dispatch' && inputs.pgo }}
uses: actions/download-artifact@v7
with:
name: pgo-profdata
path: dist
- name: Configure PGO build args
id: pgo
run: |
if [[ "${{ github.event_name }}" == "workflow_dispatch" ]] && [[ "${{ inputs.pgo }}" == "true" ]]; then
if [ ! -f dist/merged.profdata ]; then
echo "::error::Expected dist/merged.profdata from collect-pgo-profile job"
exit 1
fi
echo "use_pgo_bolt=true" >> "$GITHUB_OUTPUT"
echo "pgo_profdata=dist/merged.profdata" >> "$GITHUB_OUTPUT"
echo "Using pre-collected PGO profile from collect-pgo-profile job"
else
echo "use_pgo_bolt=false" >> "$GITHUB_OUTPUT"
echo "pgo_profdata=" >> "$GITHUB_OUTPUT"
echo "PGO disabled"
fi
- name: Determine build parameters
id: params
run: |
@@ -149,9 +107,6 @@ jobs:
push: ${{ !(github.event_name == 'workflow_dispatch' && inputs.dry_run) }}
set: |
${{ steps.params.outputs.ethereum_set }}
*.args.USE_PGO_BOLT=${{ steps.pgo.outputs.use_pgo_bolt }}
*.args.PGO_PROFDATA=${{ steps.pgo.outputs.pgo_profdata }}
*.args.STRIP_SYMBOLS=false
- name: Verify image architectures
env:

View File

@@ -63,6 +63,6 @@ jobs:
run: |
cargo nextest run \
--no-fail-fast \
--locked \
--locked --features "edge" \
-p reth-e2e-test-utils \
-E 'binary(rocksdb)'

View File

@@ -15,11 +15,18 @@ concurrency:
cancel-in-progress: true
jobs:
build-reth:
build-reth-stable:
uses: ./.github/workflows/docker-test.yml
with:
hive_target: hive
artifact_name: "reth"
hive_target: hive-stable
artifact_name: "reth-stable"
secrets: inherit
build-reth-edge:
uses: ./.github/workflows/docker-test.yml
with:
hive_target: hive-edge
artifact_name: "reth-edge"
secrets: inherit
prepare-hive:
@@ -77,6 +84,7 @@ jobs:
strategy:
fail-fast: false
matrix:
storage: [stable, edge]
# ethereum/rpc to be deprecated:
# https://github.com/ethereum/hive/pull/1117
scenario:
@@ -176,9 +184,10 @@ jobs:
- sim: ethereum/eels/consume-rlp
limit: .*tests/paris.*
needs:
- build-reth
- build-reth-stable
- build-reth-edge
- prepare-hive
name: ${{ matrix.scenario.sim }}${{ matrix.scenario.limit && format(' - {0}', matrix.scenario.limit) }}
name: ${{ matrix.storage }} / ${{ matrix.scenario.sim }}${{ matrix.scenario.limit && format(' - {0}', matrix.scenario.limit) }}
# Use larger runners for eels tests to avoid OOM runner crashes
runs-on: ${{ github.repository == 'paradigmxyz/reth' && (contains(matrix.scenario.sim, 'eels') && 'depot-ubuntu-latest-8' || 'depot-ubuntu-latest-4') || 'ubuntu-latest' }}
permissions:
@@ -197,7 +206,7 @@ jobs:
- name: Download reth image
uses: actions/download-artifact@v8
with:
name: reth
name: reth-${{ matrix.storage }}
path: /tmp
- name: Load Docker images

View File

@@ -22,7 +22,7 @@ concurrency:
jobs:
test:
name: test / ${{ matrix.network }}
name: test / ${{ matrix.network }} / ${{ matrix.storage }}
if: github.event_name != 'schedule'
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
env:
@@ -30,6 +30,7 @@ jobs:
strategy:
matrix:
network: ["ethereum"]
storage: ["stable", "edge"]
timeout-minutes: 60
steps:
- uses: actions/checkout@v6
@@ -46,7 +47,7 @@ jobs:
run: |
cargo nextest run \
--no-fail-fast \
--locked --features "asm-keccak ${{ matrix.network }}" \
--locked --features "asm-keccak ${{ matrix.network }} ${{ matrix.storage == 'edge' && 'edge' || '' }}" \
--workspace --exclude ef-tests \
-E "kind(test) and not binary(e2e_testsuite)"

View File

@@ -1,107 +0,0 @@
name: pgo-profile
on:
workflow_call:
inputs:
pgo_blocks:
description: "Number of blocks to execute for PGO profiling"
required: false
type: string
default: "20"
workflow_dispatch:
inputs:
pgo_blocks:
description: "Number of blocks to execute for PGO profiling"
required: false
type: string
default: "20"
jobs:
collect:
name: collect PGO profiles
runs-on: [self-hosted, Linux, X64]
timeout-minutes: 180
env:
SCHELK_MOUNT: /reth-bench
BENCH_RPC_URL: https://ethereum.reth.rs/rpc
RUSTC_WRAPPER: "sccache"
steps:
- uses: actions/checkout@v6
with:
submodules: true
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@stable
with:
target: x86_64-unknown-linux-gnu
- uses: mozilla-actions/sccache-action@v0.0.9
continue-on-error: true
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
- name: Install dependencies
run: |
sudo apt-get update -qq
sudo apt-get install -y --no-install-recommends \
dmsetup lsb-release wget linux-tools-"$(uname -r)" || \
sudo apt-get install -y --no-install-recommends linux-tools-generic
- name: Download snapshot if needed
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
BENCH_REPO: ${{ github.repository }}
run: |
if ! .github/scripts/bench-reth-snapshot.sh --check; then
echo "Snapshot outdated or missing, downloading..."
.github/scripts/bench-reth-snapshot.sh
fi
- name: Mount snapshot
run: |
sudo pkill -9 reth || true
sleep 1
if mountpoint -q "$SCHELK_MOUNT"; then
sudo umount -l "$SCHELK_MOUNT" || true
sudo schelk recover -y || true
fi
sudo schelk mount -y
sync
sudo sh -c 'echo 3 > /proc/sys/vm/drop_caches'
- name: Collect PGO profile
run: |
DATADIR="$SCHELK_MOUNT/datadir" \
RPC_URL="$BENCH_RPC_URL" \
PGO_BLOCKS="${{ inputs.pgo_blocks || '20' }}" \
BOLT_BLOCKS="${{ inputs.pgo_blocks || '20' }}" \
COLLECT_PGO_ONLY=true \
SKIP_BOLT=true \
PROFILE=maxperf-symbols \
FEATURES="jemalloc,asm-keccak,min-debug-logs" \
TARGET=x86_64-unknown-linux-gnu \
EXTRA_RUSTFLAGS="-C target-cpu=x86-64-v3 -C target-feature=+pclmulqdq" \
.github/scripts/build_pgo_bolt.sh
- name: Show PGO profile stats
run: |
LLVM_PROFDATA=$(find "$(rustc --print sysroot)" -name llvm-profdata -type f | head -1)
if [ -z "$LLVM_PROFDATA" ]; then
echo "::error::llvm-profdata not found in rust toolchain"
exit 1
fi
"$LLVM_PROFDATA" show --detailed-summary --topn=20 target/pgo-profiles/merged.profdata
- name: Upload PGO profile
uses: actions/upload-artifact@v7
with:
name: pgo-profdata
path: target/pgo-profiles/merged.profdata
retention-days: 1
- name: Recover snapshot
if: always()
run: |
if mountpoint -q "$SCHELK_MOUNT"; then
sudo umount -l "$SCHELK_MOUNT" || true
sudo schelk recover -y || true
fi

View File

@@ -13,14 +13,6 @@ on:
description: "Enable dry run mode (builds artifacts but skips uploads and release creation)"
type: boolean
default: false
pgo:
description: "Enable PGO profiling"
type: boolean
default: false
pgo_blocks:
description: "Number of blocks to execute for PGO profiling on self-hosted runner"
type: string
default: "20"
env:
REPO_NAME: ${{ github.repository_owner }}/reth
@@ -77,6 +69,11 @@ jobs:
fail-fast: true
matrix:
configs:
- target: x86_64-unknown-linux-gnu
os: ubuntu-24.04
profile: maxperf
allow_fail: false
rustflags: "-C target-cpu=x86-64-v3 -C target-feature=+pclmulqdq"
- target: aarch64-unknown-linux-gnu
os: ubuntu-24.04-arm
profile: maxperf
@@ -157,93 +154,11 @@ jobs:
name: ${{ matrix.build.binary }}-${{ needs.extract-version.outputs.VERSION }}-${{ matrix.configs.target }}.tar.gz.asc
path: ${{ matrix.build.binary }}-${{ needs.extract-version.outputs.VERSION }}-${{ matrix.configs.target }}.tar.gz.asc
collect-pgo-profile:
if: github.event_name == 'workflow_dispatch' && inputs.pgo
uses: ./.github/workflows/pgo-profile.yml
with:
pgo_blocks: ${{ inputs.pgo_blocks || '20' }}
secrets: inherit
build-pgo:
if: github.event_name == 'workflow_dispatch' && inputs.pgo
name: build release (x86_64-linux PGO+BOLT)
runs-on: [self-hosted, Linux, X64]
needs: [extract-version, collect-pgo-profile]
timeout-minutes: 120
env:
RUSTC_WRAPPER: "sccache"
steps:
- uses: actions/checkout@v6
with:
submodules: true
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@stable
with:
target: x86_64-unknown-linux-gnu
- uses: mozilla-actions/sccache-action@v0.0.9
continue-on-error: true
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
- name: Download pre-collected PGO profile
uses: actions/download-artifact@v7
with:
name: pgo-profdata
path: dist
- name: Verify PGO profile artifact
run: |
test -f dist/merged.profdata
ls -lh dist/merged.profdata
- name: Build Reth with PGO+BOLT
run: |
SKIP_BOLT=true \
PGO_PROFDATA="$PWD/dist/merged.profdata" \
PROFILE=maxperf-symbols \
FEATURES="jemalloc,asm-keccak,min-debug-logs" \
TARGET=x86_64-unknown-linux-gnu \
EXTRA_RUSTFLAGS="-C target-cpu=x86-64-v3 -C target-feature=+pclmulqdq" \
.github/scripts/build_pgo_bolt.sh
- name: Move binary
run: |
mkdir artifacts
mv target/maxperf-symbols/reth ./artifacts
- name: Configure GPG and create artifacts
env:
GPG_SIGNING_KEY: ${{ secrets.GPG_SIGNING_KEY }}
GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }}
run: |
export GPG_TTY=$(tty)
echo -n "$GPG_SIGNING_KEY" | base64 --decode | gpg --batch --import
cd artifacts
tar -czf reth-${{ needs.extract-version.outputs.VERSION }}-x86_64-unknown-linux-gnu.tar.gz reth*
echo "$GPG_PASSPHRASE" | gpg --passphrase-fd 0 --pinentry-mode loopback --batch -ab reth-${{ needs.extract-version.outputs.VERSION }}-x86_64-unknown-linux-gnu.tar.gz
mv *tar.gz* ..
shell: bash
- name: Upload artifact
if: ${{ github.event.inputs.dry_run != 'true' }}
uses: actions/upload-artifact@v6
with:
name: reth-${{ needs.extract-version.outputs.VERSION }}-x86_64-unknown-linux-gnu.tar.gz
path: reth-${{ needs.extract-version.outputs.VERSION }}-x86_64-unknown-linux-gnu.tar.gz
- name: Upload signature
if: ${{ github.event.inputs.dry_run != 'true' }}
uses: actions/upload-artifact@v6
with:
name: reth-${{ needs.extract-version.outputs.VERSION }}-x86_64-unknown-linux-gnu.tar.gz.asc
path: reth-${{ needs.extract-version.outputs.VERSION }}-x86_64-unknown-linux-gnu.tar.gz.asc
draft-release:
name: draft release
runs-on: ubuntu-latest
needs: [build, build-pgo, extract-version]
if: ${{ !failure() && !cancelled() && github.event.inputs.dry_run != 'true' }}
needs: [build, extract-version]
if: ${{ github.event.inputs.dry_run != 'true' }}
env:
VERSION: ${{ needs.extract-version.outputs.VERSION }}
permissions:

View File

@@ -51,12 +51,15 @@ jobs:
- name: Run execution stage
run: |
reth stage run execution --from ${{ env.FROM_BLOCK }} --to ${{ env.TO_BLOCK }} --commit --checkpoints
# NOTE: account-hashing, storage-hashing, and hashing stages are omitted.
# With storage v2 (now default), these stages are no-ops because the
# execution stage writes directly to HashedAccounts/HashedStorages.
# Running them here is harmful: `stage run` unwinds before executing,
# and the unwind reverts the hashed state that execution wrote, but
# the no-op execute never restores it — causing merkle to fail.
- name: Run account-hashing stage
run: |
reth stage run account-hashing --from ${{ env.FROM_BLOCK }} --to ${{ env.TO_BLOCK }} --commit --checkpoints
- name: Run storage hashing stage
run: |
reth stage run storage-hashing --from ${{ env.FROM_BLOCK }} --to ${{ env.TO_BLOCK }} --commit --checkpoints
- name: Run hashing stage
run: |
reth stage run hashing --from ${{ env.FROM_BLOCK }} --to ${{ env.TO_BLOCK }} --commit --checkpoints
- name: Run merkle stage
run: |
reth stage run merkle --from ${{ env.FROM_BLOCK }} --to ${{ env.TO_BLOCK }} --commit --checkpoints

View File

@@ -19,13 +19,15 @@ concurrency:
jobs:
test:
name: test / ${{ matrix.type }}
name: test / ${{ matrix.type }} / ${{ matrix.storage }}
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
env:
RUST_BACKTRACE: 1
EDGE_FEATURES: ${{ matrix.storage == 'edge' && 'edge' || '' }}
strategy:
matrix:
type: [ethereum]
storage: [stable, edge]
include:
- type: ethereum
features: asm-keccak ethereum
@@ -48,14 +50,14 @@ jobs:
run: |
cargo nextest run \
--no-fail-fast \
--features "${{ matrix.features }}" --locked \
--features "${{ matrix.features }} $EDGE_FEATURES" --locked \
${{ matrix.exclude_args }} --workspace \
--exclude ef-tests --no-tests=warn \
-E "!kind(test) and not binary(e2e_testsuite)"
state:
name: Ethereum state tests
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-8' || 'ubuntu-latest' }}
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
env:
RUST_LOG: info,sync=error
RUST_BACKTRACE: 1

407
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,5 @@
[workspace.package]
version = "1.11.3"
version = "1.11.1"
edition = "2024"
rust-version = "1.93"
license = "MIT OR Apache-2.0"
@@ -539,8 +539,6 @@ serde_json = { version = "1.0", default-features = false, features = ["alloc"] }
serde_with = { version = "3", default-features = false, features = ["macros"] }
sha2 = { version = "0.10", default-features = false }
shlex = "1.3"
# https://github.com/orlp/slotmap/pull/148
slotmap = { git = "https://github.com/DaniPopes/slotmap.git", branch = "dani/shrink-methods" }
smallvec = "1"
strum = { version = "0.27", default-features = false }
strum_macros = "0.27"

View File

@@ -1,10 +1,8 @@
# syntax=docker/dockerfile:1
# Dockerfile for reth, optimized for Depot builds
# Supports PGO+BOLT optimization for maximum performance
# Usage:
# reth: --build-arg BINARY=reth
# PGO+BOLT: --build-arg USE_PGO_BOLT=true (Linux x86_64/aarch64 only)
FROM rust:1.93 AS builder
WORKDIR /app
@@ -45,18 +43,6 @@ ENV VERGEN_GIT_SHA=$VERGEN_GIT_SHA
ENV VERGEN_GIT_DESCRIBE=$VERGEN_GIT_DESCRIBE
ENV VERGEN_GIT_DIRTY=$VERGEN_GIT_DIRTY
# Enable PGO+BOLT optimization (Linux only)
ARG USE_PGO_BOLT=false
ENV USE_PGO_BOLT=$USE_PGO_BOLT
# Optional path to a pre-collected merged.profdata file in build context.
ARG PGO_PROFDATA=""
ENV PGO_PROFDATA=$PGO_PROFDATA
# Whether to strip debug symbols from PGO-built binaries.
ARG STRIP_SYMBOLS=true
ENV STRIP_SYMBOLS=$STRIP_SYMBOLS
# Build application
# Platform-specific RUSTFLAGS: amd64 uses x86-64-v3 (Haswell+) with pclmulqdq for rocksdb
ARG TARGETPLATFORM
@@ -67,21 +53,12 @@ RUN --mount=type=secret,id=DEPOT_TOKEN,env=SCCACHE_WEBDAV_TOKEN \
--mount=type=cache,target=$SCCACHE_DIR,sharing=shared \
export RUSTC_WRAPPER=sccache SCCACHE_WEBDAV_ENDPOINT=https://cache.depot.dev SCCACHE_DIR=/sccache && \
sccache --start-server && \
if [ "$USE_PGO_BOLT" = "true" ] && [ "$TARGETPLATFORM" = "linux/amd64" ] && [ -n "$PGO_PROFDATA" ] && [ -f "$PGO_PROFDATA" ]; then \
apt-get update && apt-get install -y -qq lsb-release wget sudo && \
BINARY="$BINARY" PROFILE="$BUILD_PROFILE" FEATURES="$FEATURES" SKIP_BOLT=true STRIP_SYMBOLS="$STRIP_SYMBOLS" PGO_PROFDATA="$PGO_PROFDATA" \
./.github/scripts/build_pgo_bolt.sh; \
else \
if [ "$USE_PGO_BOLT" = "true" ]; then \
echo "PGO requested but pre-collected profile missing at '${PGO_PROFDATA:-<unset>}' - falling back to non-PGO build"; \
fi; \
if [ -n "$RUSTFLAGS" ]; then \
export RUSTFLAGS="$RUSTFLAGS"; \
elif [ "$TARGETPLATFORM" = "linux/amd64" ]; then \
export RUSTFLAGS="-C target-cpu=x86-64-v3 -C target-feature=+pclmulqdq"; \
fi && \
cargo build --profile $BUILD_PROFILE --features "$FEATURES" --locked --bin $BINARY --manifest-path $MANIFEST_PATH/Cargo.toml; \
if [ -n "$RUSTFLAGS" ]; then \
export RUSTFLAGS="$RUSTFLAGS"; \
elif [ "$TARGETPLATFORM" = "linux/amd64" ]; then \
export RUSTFLAGS="-C target-cpu=x86-64-v3 -C target-feature=+pclmulqdq"; \
fi && \
cargo build --profile $BUILD_PROFILE --features "$FEATURES" --locked --bin $BINARY --manifest-path $MANIFEST_PATH/Cargo.toml && \
sccache --show-stats
# Copy binary to a known location (ARG not resolved in COPY)

View File

@@ -7,7 +7,7 @@ use alloy_primitives::address;
use alloy_provider::{network::AnyNetwork, Provider, RootProvider};
use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types_engine::JwtSecret;
use alloy_transport::layers::{RateLimitRetryPolicy, RetryBackoffLayer};
use alloy_transport::layers::RetryBackoffLayer;
use reqwest::Url;
use reth_node_core::args::BenchmarkArgs;
use tracing::info;
@@ -53,15 +53,9 @@ impl BenchContext {
}
}
// set up alloy client for blocks, retrying on 429/503 (default) and 502
let retry_policy =
RateLimitRetryPolicy::default().or(|err: &alloy_transport::TransportError| -> bool {
err.as_transport_err()
.and_then(|t| t.as_http_error())
.is_some_and(|e| e.status == 502)
});
// set up alloy client for blocks
let client = ClientBuilder::default()
.layer(RetryBackoffLayer::new_with_policy(10, 800, u64::MAX, retry_policy))
.layer(RetryBackoffLayer::new(10, 800, u64::MAX))
.http(rpc_url.parse()?);
let block_provider = RootProvider::<AnyNetwork>::new(client);

View File

@@ -235,8 +235,8 @@ pub(crate) fn payload_to_new_payload(
))?,
)
} else {
// Preserve the original RequestsOrHash payload for engine_newPayloadV4.
let requests = prague.requests.clone();
// Extract actual Requests from RequestsOrHash
let requests = prague.requests.requests_hash();
(
version,
serde_json::to_value((

View File

@@ -89,6 +89,7 @@ default = [
"keccak-cache-global",
"asm-keccak",
"min-debug-logs",
"rocksdb",
]
otlp = [
@@ -190,6 +191,8 @@ min-trace-logs = [
]
trie-debug = ["reth-node-builder/trie-debug", "reth-node-core/trie-debug"]
rocksdb = ["reth-ethereum-cli/rocksdb", "reth-node-core/rocksdb"]
edge = ["rocksdb"]
[[bin]]
name = "reth"

View File

@@ -992,7 +992,7 @@ impl<N: NodePrimitives<SignedTx: SignedTransaction>> NewCanonicalChain<N> {
///
/// Returns the new tip for [`Self::Reorg`] and [`Self::Commit`] variants which commit at least
/// 1 new block.
pub fn tip(&self) -> &RecoveredBlock<N::Block> {
pub fn tip(&self) -> &SealedBlock<N::Block> {
match self {
Self::Commit { new } | Self::Reorg { new, .. } => {
new.last().expect("non empty blocks").recovered_block()

View File

@@ -110,6 +110,7 @@ reth-provider = { workspace = true, features = ["test-utils"] }
tempfile.workspace = true
[features]
default = []
arbitrary = [
"dep:proptest",
"dep:arbitrary",
@@ -134,3 +135,6 @@ arbitrary = [
"reth-primitives-traits/arbitrary",
"reth-ethereum-primitives/arbitrary",
]
rocksdb = ["reth-db-common/rocksdb", "reth-stages/rocksdb", "reth-provider/rocksdb", "reth-prune/rocksdb"]
edge = ["rocksdb"]

View File

@@ -73,12 +73,17 @@ pub struct EnvironmentArgs<C: ChainSpecParser> {
}
impl<C: ChainSpecParser> EnvironmentArgs<C> {
/// Returns the storage settings for new database initialization.
/// Returns the effective storage settings derived from `--storage.v2`.
///
/// Always returns [`StorageSettings::v2()`] — v2 is the default for all new
/// databases. Existing databases use the settings persisted in their metadata.
/// The base storage mode is determined by `--storage.v2`:
/// - When `--storage.v2` is set: uses [`StorageSettings::v2()`] defaults
/// - Otherwise: uses [`StorageSettings::base()`] defaults
pub fn storage_settings(&self) -> StorageSettings {
StorageSettings::v2()
if self.storage.v2 {
StorageSettings::v2()
} else {
StorageSettings::base()
}
}
/// Initializes environment according to [`AccessRights`] and returns an instance of

View File

@@ -21,6 +21,7 @@ use std::{
};
use tracing::{info, warn};
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb;
/// Interval for logging progress during checksum computation.
@@ -72,6 +73,7 @@ enum Subcommand {
limit: Option<usize>,
},
/// Calculates the checksum of a RocksDB table
#[cfg(all(unix, feature = "rocksdb"))]
Rocksdb {
/// The RocksDB table
#[arg(value_enum)]
@@ -98,6 +100,7 @@ impl Command {
Subcommand::StaticFile { segment, start_block, end_block, limit } => {
checksum_static_file(tool, segment, start_block, end_block, limit)?;
}
#[cfg(all(unix, feature = "rocksdb"))]
Subcommand::Rocksdb { table, limit } => {
rocksdb::checksum_rocksdb(tool, table, limit)?;
}

View File

@@ -290,7 +290,6 @@ mod tests {
storage_version: 2,
timestamp: 0,
base_url: None,
reth_version: None,
components: BTreeMap::new(),
}
}

View File

@@ -38,9 +38,6 @@ pub struct SnapshotManifest {
/// When omitted, downloaders should derive the base URL from the manifest URL.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub base_url: Option<String>,
/// Reth version that produced this snapshot.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reth_version: Option<String>,
/// Available snapshot components.
pub components: BTreeMap<String, ComponentManifest>,
}
@@ -556,7 +553,6 @@ pub fn generate_manifest(
storage_version: 2,
timestamp,
base_url: base_url.map(str::to_owned),
reth_version: Some(reth_node_core::version::version_metadata().short_version.to_string()),
components,
})
}
@@ -838,7 +834,6 @@ mod tests {
storage_version: 2,
timestamp: 0,
base_url: Some("https://example.com".to_string()),
reth_version: None,
components,
}
}
@@ -889,7 +884,6 @@ mod tests {
storage_version: 2,
timestamp: 0,
base_url: Some("https://example.com".to_string()),
reth_version: None,
components,
};
@@ -959,7 +953,6 @@ mod tests {
storage_version: 2,
timestamp: 0,
base_url: Some("https://example.com".to_string()),
reth_version: None,
components,
};
let urls = m.archive_urls(SnapshotComponentType::StorageChangesets);
@@ -1035,7 +1028,6 @@ mod tests {
storage_version: 2,
timestamp: 0,
base_url: Some("https://example.com".to_string()),
reth_version: None,
components,
};

View File

@@ -43,8 +43,7 @@ use url::Url;
use zstd::stream::read::Decoder as ZstdDecoder;
const BYTE_UNITS: [&str; 4] = ["B", "KB", "MB", "GB"];
const RETH_SNAPSHOTS_BASE_URL: &str = "https://snapshots-r2.reth.rs";
const RETH_SNAPSHOTS_API_URL: &str = "https://snapshots.reth.rs/api/snapshots";
const MERKLE_BASE_URL: &str = "https://downloads.merkle.io";
const EXTENSION_TAR_LZ4: &str = ".tar.lz4";
const EXTENSION_TAR_ZSTD: &str = ".tar.zst";
const DOWNLOAD_CACHE_DIR: &str = ".download-cache";
@@ -99,14 +98,14 @@ impl DownloadDefaults {
DOWNLOAD_DEFAULTS.get_or_init(DownloadDefaults::default_download_defaults)
}
/// Default download configuration with defaults from snapshots.reth.rs and publicnode
/// Default download configuration with defaults from merkle.io and publicnode
pub fn default_download_defaults() -> Self {
Self {
available_snapshots: vec![
Cow::Borrowed("https://snapshots.reth.rs (default)"),
Cow::Borrowed("https://www.merkle.io/snapshots (default, mainnet archive)"),
Cow::Borrowed("https://publicnode.com/snapshots (full nodes & testnets)"),
],
default_base_url: Cow::Borrowed(RETH_SNAPSHOTS_BASE_URL),
default_base_url: Cow::Borrowed(MERKLE_BASE_URL),
default_chain_aware_base_url: None,
long_help: None,
}
@@ -122,9 +121,7 @@ impl DownloadDefaults {
}
let mut help = String::from(
"Specify a snapshot URL or let the command propose a default one.\n\n\
Browse available snapshots at https://snapshots.reth.rs\n\
or use --list-snapshots to see them from the CLI.\n\nAvailable snapshot sources:\n",
"Specify a snapshot URL or let the command propose a default one.\n\nAvailable snapshot sources:\n",
);
for source in &self.available_snapshots {
@@ -191,7 +188,6 @@ pub struct DownloadCommand<C: ChainSpecParser> {
/// Custom URL to download a single snapshot archive (legacy mode).
///
/// When provided, downloads and extracts a single archive without component selection.
/// Browse available snapshots at <https://snapshots.reth.rs> or use --list-snapshots.
#[arg(long, short, long_help = DownloadDefaults::get_global().long_help())]
url: Option<String>,
@@ -218,30 +214,22 @@ pub struct DownloadCommand<C: ChainSpecParser> {
#[arg(long, alias = "with-changesets", conflicts_with_all = ["minimal", "full", "archive"])]
with_state_history: bool,
/// Include transaction sender static files. Requires `--with-txs`.
#[arg(long, requires = "with_txs", conflicts_with_all = ["minimal", "full", "archive"])]
with_senders: bool,
/// Include RocksDB index files.
#[arg(long, conflicts_with_all = ["minimal", "full", "archive", "without_rocksdb"])]
with_rocksdb: bool,
/// Download all available components (archive node, no pruning).
#[arg(long, alias = "all", conflicts_with_all = ["with_txs", "with_receipts", "with_state_history", "with_senders", "with_rocksdb", "minimal", "full"])]
#[arg(long, alias = "all", conflicts_with_all = ["with_txs", "with_receipts", "with_state_history", "minimal", "full"])]
archive: bool,
/// Download the minimal component set (same default as --non-interactive).
#[arg(long, conflicts_with_all = ["with_txs", "with_receipts", "with_state_history", "with_senders", "with_rocksdb", "archive", "full"])]
#[arg(long, conflicts_with_all = ["with_txs", "with_receipts", "with_state_history", "archive", "full"])]
minimal: bool,
/// Download the full node component set (matches default full prune settings).
#[arg(long, conflicts_with_all = ["with_txs", "with_receipts", "with_state_history", "with_senders", "with_rocksdb", "archive", "minimal"])]
#[arg(long, conflicts_with_all = ["with_txs", "with_receipts", "with_state_history", "archive", "minimal"])]
full: bool,
/// Skip optional RocksDB indices even when archive components are selected.
///
/// This affects `--archive`/`--all` and TUI archive preset (`a`).
#[arg(long, conflicts_with_all = ["url", "with_rocksdb"])]
#[arg(long, conflicts_with = "url")]
without_rocksdb: bool,
/// Skip interactive component selection. Downloads the minimal set
@@ -260,13 +248,6 @@ pub struct DownloadCommand<C: ChainSpecParser> {
/// Maximum number of concurrent modular archive workers.
#[arg(long, default_value_t = MAX_CONCURRENT_DOWNLOADS)]
download_concurrency: usize,
/// List available snapshots from snapshots.reth.rs and exit.
///
/// Queries the snapshots API and prints all available snapshots for the selected chain,
/// including block number, size, and manifest URL.
#[arg(long, alias = "list-snapshots", conflicts_with_all = ["url", "manifest_url", "manifest_path"])]
list: bool,
}
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCommand<C> {
@@ -279,15 +260,8 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
let cancel_token = CancellationToken::new();
let _cancel_guard = cancel_token.drop_guard();
// --list: print available snapshots and exit
if self.list {
let entries = fetch_snapshot_api_entries(chain_id).await?;
print_snapshot_listing(&entries, chain_id);
return Ok(());
}
// Legacy single-URL mode: download one archive and extract it
if let Some(ref url) = self.url {
if let Some(url) = self.url {
info!(target: "reth::cli",
dir = ?data_dir.data_dir(),
url = %url,
@@ -295,7 +269,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
);
stream_and_extract(
url,
&url,
data_dir.data_dir(),
None,
self.resumable,
@@ -308,7 +282,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
}
// Modular download: fetch manifest and select components
let manifest_source = self.resolve_manifest_source(chain_id).await?;
let manifest_source = self.resolve_manifest_source(chain_id);
info!(target: "reth::cli", source = %manifest_source, "Fetching snapshot manifest");
let mut manifest = fetch_manifest_from_source(&manifest_source).await?;
@@ -512,11 +486,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
});
}
let has_explicit_flags = self.with_txs ||
self.with_receipts ||
self.with_state_history ||
self.with_senders ||
self.with_rocksdb;
let has_explicit_flags = self.with_txs || self.with_receipts || self.with_state_history;
if has_explicit_flags {
let mut selections = BTreeMap::new();
@@ -543,13 +513,6 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
.insert(SnapshotComponentType::StorageChangesets, ComponentSelection::All);
}
}
if self.with_senders && available(SnapshotComponentType::TransactionSenders) {
selections
.insert(SnapshotComponentType::TransactionSenders, ComponentSelection::All);
}
if self.with_rocksdb && available(SnapshotComponentType::RocksdbIndices) {
selections.insert(SnapshotComponentType::RocksdbIndices, ComponentSelection::All);
}
return Ok(ResolvedComponents { selections, preset: None });
}
@@ -658,14 +621,17 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
}
}
async fn resolve_manifest_source(&self, chain_id: u64) -> Result<String> {
fn resolve_manifest_source(&self, chain_id: u64) -> String {
if let Some(path) = &self.manifest_path {
return Ok(path.display().to_string());
return path.display().to_string();
}
match &self.manifest_url {
Some(url) => Ok(url.clone()),
None => discover_manifest_url(chain_id).await,
Some(url) => url.clone(),
None => {
let base_url = get_base_url(chain_id);
format!("{base_url}/manifest.json")
}
}
}
}
@@ -1606,128 +1572,15 @@ fn file_blake3_hex(path: &Path) -> Result<String> {
Ok(hasher.finalize().to_hex().to_string())
}
/// Discovers the latest snapshot manifest URL for the given chain from the snapshots API.
///
/// Queries `snapshots.reth.rs/api/snapshots` and returns the manifest URL for the most
/// recent modular snapshot matching the requested chain.
async fn discover_manifest_url(chain_id: u64) -> Result<String> {
let api_url = RETH_SNAPSHOTS_API_URL;
info!(target: "reth::cli", %api_url, %chain_id, "Discovering latest snapshot manifest");
let entries = fetch_snapshot_api_entries(chain_id).await?;
let entry =
entries.iter().filter(|s| s.is_modular()).max_by_key(|s| s.block).ok_or_else(|| {
eyre::eyre!(
"No modular snapshot manifest found for chain \
{chain_id} at {api_url}\n\n\
You can provide a manifest URL directly with --manifest-url, or\n\
use a direct snapshot URL with -u from:\n\
\t- https://snapshots.reth.rs\n\n\
Use --list to see all available snapshots."
)
})?;
info!(target: "reth::cli",
block = entry.block,
url = %entry.metadata_url,
"Found latest snapshot manifest"
);
Ok(entry.metadata_url.clone())
}
/// Deserializes a JSON value that may be either a number or a string-encoded number.
fn deserialize_string_or_u64<'de, D>(deserializer: D) -> std::result::Result<u64, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::Deserialize;
let value = serde_json::Value::deserialize(deserializer)?;
match &value {
serde_json::Value::Number(n) => {
n.as_u64().ok_or_else(|| serde::de::Error::custom("expected u64"))
}
serde_json::Value::String(s) => {
s.parse::<u64>().map_err(|_| serde::de::Error::custom("expected numeric string"))
}
_ => Err(serde::de::Error::custom("expected number or string")),
/// Builds the base URL for the given chain ID using configured defaults.
fn get_base_url(chain_id: u64) -> String {
let defaults = DownloadDefaults::get_global();
match &defaults.default_chain_aware_base_url {
Some(url) => format!("{url}/{chain_id}"),
None => defaults.default_base_url.to_string(),
}
}
/// An entry from the `snapshots.reth.rs/api/snapshots` listing.
#[derive(serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct SnapshotApiEntry {
#[serde(deserialize_with = "deserialize_string_or_u64")]
chain_id: u64,
#[serde(deserialize_with = "deserialize_string_or_u64")]
block: u64,
#[serde(default)]
date: Option<String>,
#[serde(default)]
profile: Option<String>,
metadata_url: String,
#[serde(default)]
size: u64,
}
impl SnapshotApiEntry {
fn is_modular(&self) -> bool {
self.metadata_url.ends_with("manifest.json")
}
}
/// Fetches the full snapshot listing from the snapshots API, filtered by chain ID.
async fn fetch_snapshot_api_entries(chain_id: u64) -> Result<Vec<SnapshotApiEntry>> {
let api_url = RETH_SNAPSHOTS_API_URL;
let entries: Vec<SnapshotApiEntry> = Client::new()
.get(api_url)
.send()
.await
.and_then(|r| r.error_for_status())
.wrap_err_with(|| format!("Failed to fetch snapshot listing from {api_url}"))?
.json()
.await?;
Ok(entries.into_iter().filter(|e| e.chain_id == chain_id).collect())
}
/// Prints a formatted table of available modular snapshots.
fn print_snapshot_listing(entries: &[SnapshotApiEntry], chain_id: u64) {
let modular: Vec<_> = entries.iter().filter(|e| e.is_modular()).collect();
println!("Available snapshots for chain {chain_id} (https://snapshots.reth.rs):\n");
println!("{:<12} {:>10} {:<10} {:>10} MANIFEST URL", "DATE", "BLOCK", "PROFILE", "SIZE");
println!("{}", "-".repeat(100));
for entry in &modular {
let date = entry.date.as_deref().unwrap_or("-");
let profile = entry.profile.as_deref().unwrap_or("-");
let size = if entry.size > 0 {
DownloadProgress::format_size(entry.size)
} else {
"-".to_string()
};
println!(
"{date:<12} {:>10} {profile:<10} {size:>10} {}",
entry.block, entry.metadata_url
);
}
if modular.is_empty() {
println!(" (no modular snapshots found)");
}
println!(
"\nTo download a specific snapshot, copy its manifest URL and run:\n \
reth download --manifest-url <URL>"
);
}
async fn fetch_manifest_from_source(source: &str) -> Result<SnapshotManifest> {
if let Ok(parsed) = Url::parse(source) {
return match parsed.scheme() {
@@ -1744,7 +1597,7 @@ async fn fetch_manifest_from_source(source: &str) -> Result<SnapshotManifest> {
You can use a direct snapshot URL instead:\n\n\
\treth download -u <snapshot-url>\n\n\
Available snapshot sources:\n\
\t- https://snapshots.reth.rs\n\
\t- https://www.merkle.io/snapshots\n\
\t- https://publicnode.com/snapshots"
)
})?;
@@ -1813,6 +1666,26 @@ fn resolve_manifest_base_url(manifest: &SnapshotManifest, source: &str) -> Resul
Ok(base)
}
/// Builds default URL for latest mainnet archive snapshot using configured defaults.
///
/// Used by the legacy single-archive download flow when no manifest is available.
#[allow(dead_code)]
async fn get_latest_snapshot_url(chain_id: u64) -> Result<String> {
let base_url = get_base_url(chain_id);
let latest_url = format!("{base_url}/latest.txt");
let filename = Client::new()
.get(latest_url)
.send()
.await?
.error_for_status()?
.text()
.await?
.trim()
.to_string();
Ok(format!("{base_url}/{filename}"))
}
#[cfg(test)]
mod tests {
use super::*;
@@ -1845,7 +1718,6 @@ mod tests {
storage_version: 2,
timestamp: 0,
base_url: Some("https://example.com".to_string()),
reth_version: None,
components,
}
}
@@ -1877,7 +1749,7 @@ mod tests {
let help = defaults.long_help();
assert!(help.contains("Available snapshot sources:"));
assert!(help.contains("snapshots.reth.rs"));
assert!(help.contains("merkle.io"));
assert!(help.contains("publicnode.com"));
assert!(help.contains("file://"));
}

View File

@@ -19,12 +19,11 @@ use reth_node_api::BlockTy;
use reth_node_events::node::NodeEvent;
use reth_provider::{
providers::ProviderNodeTypes, BlockNumReader, HeaderProvider, ProviderError, ProviderFactory,
RocksDBProviderFactory, StageCheckpointReader,
StageCheckpointReader,
};
use reth_prune::PruneModes;
use reth_stages::{prelude::*, ControlFlow, Pipeline, StageId, StageSet};
use reth_static_file::StaticFileProducer;
use reth_storage_api::StorageSettingsCache;
use std::{path::Path, sync::Arc};
use tokio::sync::watch;
use tracing::{debug, error, info, warn};
@@ -109,11 +108,7 @@ where
let provider = provider_factory.provider()?;
let init_blocks = provider.tx_ref().entries::<tables::HeaderNumbers>()?;
let init_txns = if provider_factory.cached_storage_settings().storage_v2 {
provider_factory.rocksdb_provider().iter::<tables::TransactionHashNumbers>()?.count()
} else {
provider.tx_ref().entries::<tables::TransactionHashNumbers>()?
};
let init_txns = provider.tx_ref().entries::<tables::TransactionHashNumbers>()?;
drop(provider);
let mut total_decoded_blocks = 0;
@@ -220,12 +215,8 @@ where
let provider = provider_factory.provider()?;
let total_imported_blocks = provider.tx_ref().entries::<tables::HeaderNumbers>()? - init_blocks;
let current_txns = if provider_factory.cached_storage_settings().storage_v2 {
provider_factory.rocksdb_provider().iter::<tables::TransactionHashNumbers>()?.count()
} else {
provider.tx_ref().entries::<tables::TransactionHashNumbers>()?
};
let total_imported_txns = current_txns - init_txns;
let total_imported_txns =
provider.tx_ref().entries::<tables::TransactionHashNumbers>()? - init_txns;
let result = ImportResult {
total_decoded_blocks,

View File

@@ -193,10 +193,7 @@ impl<C: ChainSpecParser> DownloadArgs<C> {
let default_secret_key_path = data_dir.p2p_secret();
let p2p_secret_key = self.network.secret_key(default_secret_key_path)?;
let rlpx_socket = (self.network.addr, self.network.port).into();
let boot_nodes = self
.network
.resolved_bootnodes()
.unwrap_or_else(|| self.chain.bootnodes().unwrap_or_default());
let boot_nodes = self.chain.bootnodes().unwrap_or_default();
let net =
NetworkConfigBuilder::<N::NetworkPrimitives>::new(p2p_secret_key, Runtime::test())

View File

@@ -12,6 +12,7 @@ use reth_node_metrics::{
server::{MetricServer, MetricServerConfig},
version::VersionInfo,
};
#[cfg(all(unix, feature = "rocksdb"))]
use reth_provider::RocksDBProviderFactory;
use reth_prune::PrunerBuilder;
use reth_static_file::StaticFileProducer;
@@ -121,6 +122,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> PruneComma
}
// Flush and compact RocksDB to reclaim disk space after pruning
#[cfg(all(unix, feature = "rocksdb"))]
{
info!(target: "reth::cli", "Flushing and compacting RocksDB...");
provider_factory.rocksdb_provider().flush_and_compact()?;

View File

@@ -75,3 +75,8 @@ path = "tests/e2e-testsuite/main.rs"
[[test]]
name = "rocksdb"
path = "tests/rocksdb/main.rs"
required-features = ["rocksdb"]
[features]
rocksdb = ["reth-node-core/rocksdb", "reth-provider/rocksdb", "reth-cli-commands/rocksdb"]
edge = ["rocksdb"]

View File

@@ -1,5 +1,7 @@
//! E2E tests for `RocksDB` provider functionality.
#![cfg(all(feature = "rocksdb", unix))]
use alloy_consensus::BlockHeader;
use alloy_primitives::B256;
use alloy_rpc_types_eth::{Transaction, TransactionReceipt};

View File

@@ -146,8 +146,6 @@ pub struct TreeConfig {
slow_block_threshold: Option<Duration>,
/// Whether to fully disable sparse trie cache pruning between blocks.
disable_sparse_trie_cache_pruning: bool,
/// Whether to use the arena-based sparse trie implementation.
enable_arena_sparse_trie: bool,
/// Timeout for the state root task before spawning a sequential fallback computation.
/// If `Some`, after waiting this duration for the state root task, a sequential state root
/// computation is spawned in parallel and whichever finishes first is used.
@@ -187,7 +185,6 @@ impl Default for TreeConfig {
sparse_trie_max_hot_accounts: DEFAULT_SPARSE_TRIE_MAX_HOT_ACCOUNTS,
slow_block_threshold: None,
disable_sparse_trie_cache_pruning: false,
enable_arena_sparse_trie: false,
state_root_task_timeout: Some(DEFAULT_STATE_ROOT_TASK_TIMEOUT),
#[cfg(feature = "trie-debug")]
proof_jitter: None,
@@ -249,7 +246,6 @@ impl TreeConfig {
sparse_trie_max_hot_accounts,
slow_block_threshold,
disable_sparse_trie_cache_pruning: false,
enable_arena_sparse_trie: false,
state_root_task_timeout,
#[cfg(feature = "trie-debug")]
proof_jitter: None,
@@ -552,17 +548,6 @@ impl TreeConfig {
self
}
/// Returns whether the arena-based sparse trie is enabled.
pub const fn enable_arena_sparse_trie(&self) -> bool {
self.enable_arena_sparse_trie
}
/// Setter for whether to enable the arena-based sparse trie.
pub const fn with_enable_arena_sparse_trie(mut self, value: bool) -> Self {
self.enable_arena_sparse_trie = value;
self
}
/// Returns the state root task timeout.
pub const fn state_root_task_timeout(&self) -> Option<Duration> {
self.state_root_task_timeout

View File

@@ -139,6 +139,13 @@ trie-debug = [
"reth-engine-primitives/trie-debug",
"dep:serde_json",
]
rocksdb = [
"reth-provider/rocksdb",
"reth-prune/rocksdb",
"reth-stages?/rocksdb",
"reth-e2e-test-utils/rocksdb",
]
edge = ["rocksdb"]
[[test]]
name = "e2e_testsuite"

View File

@@ -38,7 +38,9 @@ use reth_trie_parallel::{
proof_task::{ProofTaskCtx, ProofWorkerHandle},
root::ParallelStateRootError,
};
use reth_trie_sparse::ParallelismThresholds;
use reth_trie_sparse::{
ParallelSparseTrie, ParallelismThresholds, RevealableSparseTrie, SparseStateTrie,
};
use std::{
ops::Not,
sync::{
@@ -57,17 +59,14 @@ pub mod prewarm;
pub mod receipt_root_task;
pub mod sparse_trie;
pub use preserved_sparse_trie::{
PayloadSparseTrieCache, PayloadSparseTrieKind, PayloadSparseTrieStoreOutcome,
SparseTrieCheckout,
};
use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
/// Default parallelism thresholds to use with the [`ParallelSparseTrie`].
///
/// These values were determined by performing benchmarks using gradually increasing values to judge
/// the effects. Below 100 throughput would generally be equal or slightly less, while above 150 it
/// the affects. Below 100 throughput would generally be equal or slightly less, while above 150 it
/// would deteriorate to the point where PST might as well not be used.
const PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS: ParallelismThresholds =
pub const PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS: ParallelismThresholds =
ParallelismThresholds { min_revealed_nodes: 100, min_updated_nodes: 100 };
/// Default node capacity for shrinking the sparse trie. This is used to limit the number of trie
@@ -104,52 +103,6 @@ type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
<N as NodePrimitives>::Receipt,
>;
/// Shared cache handles that can be exported to engine consumers and downstream payload builders.
#[derive(Debug, Clone)]
pub struct EngineSharedCaches<Evm: ConfigureEvm> {
execution_cache: PayloadExecutionCache,
sparse_trie_cache: PayloadSparseTrieCache,
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
}
impl<Evm> Default for EngineSharedCaches<Evm>
where
Evm: ConfigureEvm,
{
fn default() -> Self {
Self::with_sparse_trie_kind(PayloadSparseTrieKind::default())
}
}
impl<Evm> EngineSharedCaches<Evm>
where
Evm: ConfigureEvm,
{
/// Creates shared caches backed by the requested sparse trie implementation.
pub fn with_sparse_trie_kind(sparse_trie_kind: PayloadSparseTrieKind) -> Self {
Self {
execution_cache: Default::default(),
sparse_trie_cache: PayloadSparseTrieCache::new(sparse_trie_kind),
precompile_cache_map: Default::default(),
}
}
/// Returns the shared execution cache handle for engine-internal use.
pub(crate) fn execution_cache(&self) -> PayloadExecutionCache {
self.execution_cache.clone()
}
/// Returns the shared sparse trie cache handle.
pub fn sparse_trie_cache(&self) -> PayloadSparseTrieCache {
self.sparse_trie_cache.clone()
}
/// Returns the shared precompile cache map.
pub fn precompile_cache_map(&self) -> PrecompileCacheMap<SpecFor<Evm>> {
self.precompile_cache_map.clone()
}
}
/// Entrypoint for executing the payload.
#[derive(Debug)]
pub struct PayloadProcessor<Evm>
@@ -158,8 +111,8 @@ where
{
/// The executor used by to spawn tasks.
executor: Runtime,
/// Shared caches reused across payload processing.
shared_caches: EngineSharedCaches<Evm>,
/// The most recent cache used for execution.
execution_cache: PayloadExecutionCache,
/// Metrics for trie operations
trie_metrics: MultiProofTaskMetrics,
/// Cross-block cache size in bytes.
@@ -172,6 +125,12 @@ where
evm_config: Evm,
/// Whether precompile cache should be disabled.
precompile_cache_disabled: bool,
/// Precompile cache map.
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
/// A pruned `SparseStateTrie`, kept around as a cache of already revealed trie nodes and to
/// re-use allocated memory. Stored with the block hash it was computed for to enable trie
/// preservation across sequential payload validations.
sparse_state_trie: SharedPreservedSparseTrie,
/// LFU hot-slot capacity: max storage slots retained across prune cycles.
sparse_trie_max_hot_slots: usize,
/// LFU hot-account capacity: max account addresses retained across prune cycles.
@@ -197,17 +156,19 @@ where
executor: Runtime,
evm_config: Evm,
config: &TreeConfig,
shared_caches: EngineSharedCaches<Evm>,
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
) -> Self {
Self {
executor,
shared_caches,
execution_cache: Default::default(),
trie_metrics: Default::default(),
cross_block_cache_size: config.cross_block_cache_size(),
disable_transaction_prewarming: config.disable_prewarming(),
evm_config,
disable_state_cache: config.disable_state_cache(),
precompile_cache_disabled: config.precompile_cache_disabled(),
precompile_cache_map,
sparse_state_trie: SharedPreservedSparseTrie::default(),
sparse_trie_max_hot_slots: config.sparse_trie_max_hot_slots(),
sparse_trie_max_hot_accounts: config.sparse_trie_max_hot_accounts(),
disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
@@ -224,8 +185,8 @@ where
debug!(target: "engine::tree::payload_processor", "Waiting for execution cache and sparse trie locks");
// Wait for both caches in parallel using std threads
let execution_cache = self.shared_caches.execution_cache();
let sparse_trie = self.shared_caches.sparse_trie_cache();
let execution_cache = self.execution_cache.clone();
let sparse_trie = self.sparse_state_trie.clone();
// Use channels and spawn_blocking instead of std::thread::spawn
let (execution_tx, execution_rx) = std::sync::mpsc::channel();
@@ -539,12 +500,12 @@ where
terminate_execution: Arc::new(AtomicBool::new(false)),
executed_tx_index: Arc::clone(&executed_tx_index),
precompile_cache_disabled: self.precompile_cache_disabled,
precompile_cache_map: self.shared_caches.precompile_cache_map(),
precompile_cache_map: self.precompile_cache_map.clone(),
};
let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
self.executor.clone(),
self.shared_caches.execution_cache(),
self.execution_cache.clone(),
prewarm_ctx,
to_multi_proof,
);
@@ -572,7 +533,7 @@ where
/// instance.
#[instrument(level = "debug", target = "engine::caching", skip(self))]
fn cache_for(&self, parent_hash: B256) -> SavedCache {
if let Some(cache) = self.shared_caches.execution_cache().get_cache_for(parent_hash) {
if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
debug!("reusing execution cache");
cache
} else {
@@ -597,7 +558,7 @@ where
parent_state_root: B256,
chunk_size: usize,
) {
let sparse_trie_cache = self.shared_caches.sparse_trie_cache();
let preserved_sparse_trie = self.sparse_state_trie.clone();
let trie_metrics = self.trie_metrics.clone();
let max_hot_slots = self.sparse_trie_max_hot_slots;
let max_hot_accounts = self.sparse_trie_max_hot_accounts;
@@ -611,19 +572,41 @@ where
let _enter = debug_span!(target: "engine::tree::payload_processor", parent: parent_span, "sparse_trie_task")
.entered();
// Reuse a stored SparseStateTrie if available, applying continuation logic.
// If this payload's parent state root matches the preserved trie's anchor,
// we can reuse the pruned trie structure. Otherwise, we clear the trie but
// keep allocations.
let start = Instant::now();
let mut checkout = sparse_trie_cache.take_or_create_for(parent_state_root);
let preserved = preserved_sparse_trie.take();
trie_metrics
.sparse_trie_cache_wait_duration_histogram
.record(start.elapsed().as_secs_f64());
checkout.set_hot_cache_capacities(max_hot_slots, max_hot_accounts);
let mut task = SparseTrieCacheTask::new_with_checkout(
let mut sparse_state_trie = preserved
.map(|preserved| preserved.into_trie_for(parent_state_root))
.unwrap_or_else(|| {
debug!(
target: "engine::tree::payload_processor",
"Creating new sparse trie - no preserved trie available"
);
let default_trie = RevealableSparseTrie::blind_from(
ParallelSparseTrie::default().with_parallelism_thresholds(
PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS,
),
);
SparseStateTrie::new()
.with_accounts_trie(default_trie.clone())
.with_default_storage_trie(default_trie)
.with_updates(true)
});
sparse_state_trie.set_hot_cache_capacities(max_hot_slots, max_hot_accounts);
let mut task = SparseTrieCacheTask::new_with_trie(
&executor,
from_multi_proof,
proof_worker_handle,
trie_metrics.clone(),
checkout,
sparse_state_trie,
chunk_size,
);
@@ -634,7 +617,7 @@ where
// causing take() to return None and forcing it to create a new empty trie
// instead of reusing the preserved one. Holding the guard ensures the next
// block's take() blocks until we've stored the trie for reuse.
let mut guard = sparse_trie_cache.lock();
let mut guard = preserved_sparse_trie.lock();
let task_result = result.as_ref().ok().cloned();
// Send state root computation result - next block may start but will block on take()
@@ -649,9 +632,10 @@ where
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
);
trie.store_prepared_cleared_with_guard(&mut guard);
guard.store(PreservedSparseTrie::cleared(trie));
// Drop guard before deferred to release lock before expensive deallocations
drop(guard);
executor.spawn_drop(deferred);
drop(deferred);
return;
}
@@ -678,7 +662,7 @@ where
trie_metrics
.sparse_trie_retained_storage_tries
.set(trie.retained_storage_tries_count() as f64);
trie.store_anchored_with_guard(&mut guard, result.state_root);
guard.store(PreservedSparseTrie::anchored(trie, result.state_root));
deferred
} else {
debug!(
@@ -689,11 +673,12 @@ where
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
);
trie.store_prepared_cleared_with_guard(&mut guard);
guard.store(PreservedSparseTrie::cleared(trie));
deferred
};
// Drop guard before deferred to release lock before expensive deallocations
drop(guard);
executor.spawn_drop(deferred);
drop(deferred);
});
}
@@ -710,7 +695,7 @@ where
bundle_state: &BundleState,
) {
let disable_cache_metrics = self.disable_cache_metrics;
self.shared_caches.execution_cache().update_with_guard(|cached| {
self.execution_cache.update_with_guard(|cached| {
if cached.as_ref().is_some_and(|c| c.executed_block_hash() != block_with_parent.parent) {
debug!(
target: "engine::caching",
@@ -1014,7 +999,7 @@ impl<R> Drop for CacheTaskHandle<R> {
/// - Prepares data for state root proof computation
/// - Runs concurrently but must not interfere with cache saves
#[derive(Clone, Debug, Default)]
pub(crate) struct PayloadExecutionCache {
pub struct PayloadExecutionCache {
/// Guarded cloneable cache identified by a block hash.
inner: Arc<RwLock<Option<SavedCache>>>,
/// Metrics for cache operations.
@@ -1022,11 +1007,11 @@ pub(crate) struct PayloadExecutionCache {
}
impl PayloadExecutionCache {
/// Returns the cache backing store for `parent_hash` if it's available for reuse.
/// Returns the cache for `parent_hash` if it's available for use.
///
/// If the tracked cache is available but keyed to a different parent hash, the cache is
/// cleared and returned so callers can reuse the underlying allocations without carrying over
/// stale state.
/// A cache is considered available when:
/// - It exists and matches the requested parent hash
/// - No other tasks are currently using it (checked via Arc reference count)
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip(self))]
pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
let start = Instant::now();
@@ -1065,7 +1050,7 @@ impl PayloadExecutionCache {
// and picking up polluted data if the fork block fails.
c.clear_with_hash(parent_hash);
}
return Some(c.clone());
return Some(c.clone())
} else if hash_matches {
self.metrics.execution_cache_in_use.increment(1);
}
@@ -1082,7 +1067,7 @@ impl PayloadExecutionCache {
/// This is useful for synchronization before starting payload processing.
///
/// Returns the time spent waiting for the lock.
pub(crate) fn wait_for_availability(&self) -> Duration {
pub fn wait_for_availability(&self) -> Duration {
let start = Instant::now();
// Acquire write lock to wait for any current holders to finish
let _guard = self.inner.write();
@@ -1110,7 +1095,7 @@ impl PayloadExecutionCache {
///
/// Violating this requirement can result in cache corruption, incorrect state data,
/// and potential consensus failures.
pub(crate) fn update_with_guard<F>(&self, update_fn: F)
pub fn update_with_guard<F>(&self, update_fn: F)
where
F: FnOnce(&mut Option<SavedCache>),
{
@@ -1179,9 +1164,8 @@ mod tests {
use super::PayloadExecutionCache;
use crate::tree::{
cached_state::{CachedStateMetrics, ExecutionCache, SavedCache},
payload_processor::{
evm_state_to_hashed_post_state, EngineSharedCaches, ExecutionEnv, PayloadProcessor,
},
payload_processor::{evm_state_to_hashed_post_state, ExecutionEnv, PayloadProcessor},
precompile_cache::PrecompileCacheMap,
StateProviderBuilder, TreeConfig,
};
use alloy_eips::eip1898::{BlockNumHash, BlockWithParent};
@@ -1293,7 +1277,7 @@ mod tests {
reth_tasks::Runtime::test(),
EthEvmConfig::new(Arc::new(ChainSpec::default())),
&TreeConfig::default(),
EngineSharedCaches::default(),
PrecompileCacheMap::default(),
);
let parent_hash = B256::from([1u8; 32]);
@@ -1305,17 +1289,13 @@ mod tests {
let bundle_state = BundleState::default();
// Cache should be empty initially
assert!(payload_processor
.shared_caches
.execution_cache()
.get_cache_for(block_hash)
.is_none());
assert!(payload_processor.execution_cache.get_cache_for(block_hash).is_none());
// Update cache with inserted block
payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
// Cache should now exist for the block hash
let cached = payload_processor.shared_caches.execution_cache().get_cache_for(block_hash);
let cached = payload_processor.execution_cache.get_cache_for(block_hash);
assert!(cached.is_some());
assert_eq!(cached.unwrap().executed_block_hash(), block_hash);
}
@@ -1326,14 +1306,13 @@ mod tests {
reth_tasks::Runtime::test(),
EthEvmConfig::new(Arc::new(ChainSpec::default())),
&TreeConfig::default(),
EngineSharedCaches::default(),
PrecompileCacheMap::default(),
);
// Setup: populate cache with block 1
let block1_hash = B256::from([1u8; 32]);
payload_processor
.shared_caches
.execution_cache()
.execution_cache
.update_with_guard(|slot| *slot = Some(make_saved_cache(block1_hash)));
// Try to insert block 3 with wrong parent (should skip and keep block 1's cache)
@@ -1348,11 +1327,11 @@ mod tests {
payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
// Cache should still be for block 1 (unchanged)
let cached = payload_processor.shared_caches.execution_cache().get_cache_for(block1_hash);
let cached = payload_processor.execution_cache.get_cache_for(block1_hash);
assert!(cached.is_some(), "Original cache should be preserved");
// Cache for block 3 should not exist
let cached3 = payload_processor.shared_caches.execution_cache().get_cache_for(block3_hash);
let cached3 = payload_processor.execution_cache.get_cache_for(block3_hash);
assert!(cached3.is_none(), "New block cache should not be created on mismatch");
}
@@ -1462,7 +1441,7 @@ mod tests {
reth_tasks::Runtime::test(),
EthEvmConfig::new(factory.chain_spec()),
&TreeConfig::default(),
EngineSharedCaches::default(),
PrecompileCacheMap::default(),
);
let provider_factory = BlockchainProvider::new(factory).unwrap();

View File

@@ -1,128 +1,44 @@
//! Preserved sparse trie for reuse across payload validations.
use super::{
PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS, SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
};
use alloy_primitives::B256;
use parking_lot::Mutex;
use reth_trie_sparse::{
ArenaParallelSparseTrie, ConfigurableSparseTrie, ParallelSparseTrie, RevealableSparseTrie,
SparseStateTrie,
};
use std::{
ops::{Deref, DerefMut},
sync::Arc,
time::{Duration, Instant},
};
use reth_trie_sparse::SparseStateTrie;
use std::{sync::Arc, time::Instant};
use tracing::debug;
/// Type alias for the sparse trie type used in preservation.
type SparseTrie = SparseStateTrie<ConfigurableSparseTrie, ConfigurableSparseTrie>;
pub(super) type SparseTrie = SparseStateTrie;
/// Sparse trie implementation used by [`PayloadSparseTrieCache`].
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum PayloadSparseTrieKind {
/// Back sparse trie storage with hash maps.
#[default]
HashMap,
/// Back sparse trie storage with arena allocations.
Arena,
}
impl From<bool> for PayloadSparseTrieKind {
fn from(enable_arena_sparse_trie: bool) -> Self {
if enable_arena_sparse_trie {
Self::Arena
} else {
Self::HashMap
}
}
}
#[derive(Debug, Default)]
struct PayloadSparseTrieState {
latest_checkout_id: u64,
preserved: Option<PreservedSparseTrie>,
}
/// Outcome of storing a checked-out sparse trie back into the shared cache.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PayloadSparseTrieStoreOutcome {
/// The checkout was the most recent lease and the trie was stored.
Stored,
/// A newer checkout had already been issued, so this stale lease was ignored.
IgnoredStaleCheckout,
}
/// Shared sparse trie cache that can be reused across payload validations.
/// Shared handle to a preserved sparse trie that can be reused across payload validations.
///
/// This is the public sparse-trie SDK surface exposed through
/// [`EngineSharedCaches`](super::EngineSharedCaches). Callers take or create a trie, use it for
/// payload work, then store it back either anchored to the resulting state root or cleared for
/// allocation reuse.
#[derive(Debug, Clone)]
pub struct PayloadSparseTrieCache {
kind: PayloadSparseTrieKind,
state: Arc<Mutex<PayloadSparseTrieState>>,
}
/// This is stored in [`PayloadProcessor`](super::PayloadProcessor) and cloned to pass to
/// [`SparseTrieCacheTask`](super::sparse_trie::SparseTrieCacheTask) for trie reuse.
#[derive(Debug, Default, Clone)]
pub(super) struct SharedPreservedSparseTrie(Arc<Mutex<Option<PreservedSparseTrie>>>);
impl Default for PayloadSparseTrieCache {
fn default() -> Self {
Self::new(PayloadSparseTrieKind::default())
}
}
impl PayloadSparseTrieCache {
/// Creates a sparse trie cache backed by the requested trie implementation.
pub fn new(kind: PayloadSparseTrieKind) -> Self {
Self { kind, state: Arc::new(Mutex::new(PayloadSparseTrieState::default())) }
impl SharedPreservedSparseTrie {
/// Takes the preserved trie if present, leaving `None` in its place.
pub(super) fn take(&self) -> Option<PreservedSparseTrie> {
self.0.lock().take()
}
/// Returns the sparse trie implementation used when the cache needs to create a new trie.
pub const fn kind(&self) -> PayloadSparseTrieKind {
self.kind
}
/// Takes a preserved trie for `parent_state_root` or creates a new trie if the cache is empty.
pub fn take_or_create_for(&self, parent_state_root: B256) -> SparseTrieCheckout {
let start = Instant::now();
let mut state = self.state.lock();
state.latest_checkout_id += 1;
let checkout_id = state.latest_checkout_id;
let trie = state
.preserved
.take()
.map(|preserved| preserved.into_trie_for(parent_state_root))
.unwrap_or_else(|| {
debug!(
target: "engine::tree::payload_processor",
%parent_state_root,
kind = ?self.kind,
"Creating new sparse trie - no preserved trie available"
);
new_sparse_trie(self.kind)
});
drop(state);
let elapsed = start.elapsed();
if elapsed.as_millis() > 5 {
debug!(
target: "engine::tree::payload_processor",
blocked_for=?elapsed,
"Waited for preserved sparse trie checkout"
);
}
SparseTrieCheckout { trie: Some(trie), cache: self.clone(), checkout_id }
/// Acquires a guard that blocks `take()` until dropped.
/// Use this before sending the state root result to ensure the next block
/// waits for the trie to be stored.
pub(super) fn lock(&self) -> PreservedTrieGuard<'_> {
PreservedTrieGuard(self.0.lock())
}
/// Waits until the sparse trie lock becomes available.
///
/// This acquires and immediately releases the lock, ensuring that any
/// ongoing operations complete before returning. Useful for synchronization
/// before starting payload processing.
///
/// Returns the time spent waiting for the lock.
pub fn wait_for_availability(&self) -> Duration {
pub(super) fn wait_for_availability(&self) -> std::time::Duration {
let start = Instant::now();
let _guard = self.state.lock();
let _guard = self.0.lock();
let elapsed = start.elapsed();
if elapsed.as_millis() > 5 {
debug!(
@@ -133,142 +49,27 @@ impl PayloadSparseTrieCache {
}
elapsed
}
/// Acquires a guard that blocks cache mutation until dropped.
///
/// Engine-internal code uses this before making the state-root result visible so the next
/// payload cannot observe an empty cache between send and store.
pub(super) fn lock(&self) -> PreservedTrieGuard<'_> {
PreservedTrieGuard { state: self.state.lock() }
}
}
/// A checked-out sparse trie lease.
///
/// This dereferences to [`SparseStateTrie`] so callers can reuse the trie directly. If the lease is
/// dropped without being stored back, a cleared trie is returned to the shared cache unless a newer
/// checkout has already superseded it.
#[derive(Debug)]
pub struct SparseTrieCheckout {
trie: Option<SparseTrie>,
cache: PayloadSparseTrieCache,
checkout_id: u64,
}
impl SparseTrieCheckout {
/// Stores the trie back into the shared cache anchored to the given state root.
pub fn store_anchored(self, state_root: B256) -> PayloadSparseTrieStoreOutcome {
let cache = self.cache.clone();
let mut guard = cache.lock();
self.store_anchored_with_guard(&mut guard, state_root)
}
/// Stores the trie back into the shared cache in a cleared state.
pub fn store_cleared(mut self) -> PayloadSparseTrieStoreOutcome {
let cache = self.cache.clone();
let mut trie = self.take_trie();
prepare_cleared_trie(&mut trie);
let deferred = trie.take_deferred_drops();
let mut guard = cache.lock();
let outcome = guard.store(self.checkout_id, PreservedSparseTrie::cleared(trie));
drop(guard);
drop(deferred);
outcome
}
/// Stores the trie back into the shared cache anchored to the given state root while the
/// caller is already holding the preservation lock.
pub(super) fn store_anchored_with_guard(
mut self,
guard: &mut PreservedTrieGuard<'_>,
state_root: B256,
) -> PayloadSparseTrieStoreOutcome {
guard.store(self.checkout_id, PreservedSparseTrie::anchored(self.take_trie(), state_root))
}
/// Stores an already-cleared trie back into the shared cache while the caller is already
/// holding the preservation lock.
pub(super) fn store_prepared_cleared_with_guard(
mut self,
guard: &mut PreservedTrieGuard<'_>,
) -> PayloadSparseTrieStoreOutcome {
guard.store(self.checkout_id, PreservedSparseTrie::cleared(self.take_trie()))
}
fn take_trie(&mut self) -> SparseTrie {
self.trie.take().expect("sparse trie checkout must hold a trie until it is stored")
}
}
impl Deref for SparseTrieCheckout {
type Target = SparseTrie;
fn deref(&self) -> &Self::Target {
self.trie.as_ref().expect("sparse trie checkout must hold a trie until it is stored")
}
}
impl DerefMut for SparseTrieCheckout {
fn deref_mut(&mut self) -> &mut Self::Target {
self.trie.as_mut().expect("sparse trie checkout must hold a trie until it is stored")
}
}
impl Drop for SparseTrieCheckout {
fn drop(&mut self) {
let Some(mut trie) = self.trie.take() else { return };
debug!(
target: "engine::tree::payload_processor",
checkout_id = self.checkout_id,
"Sparse trie checkout dropped before store, returning cleared trie to cache"
);
prepare_cleared_trie(&mut trie);
let deferred = trie.take_deferred_drops();
let mut guard = self.cache.lock();
let _ = guard.store(self.checkout_id, PreservedSparseTrie::cleared(trie));
drop(guard);
drop(deferred);
}
}
/// Guard that holds the lock on the preserved trie.
/// While held, take-or-create calls will block. Call `store()` to save the trie before dropping.
pub(super) struct PreservedTrieGuard<'a> {
state: parking_lot::MutexGuard<'a, PayloadSparseTrieState>,
}
/// While held, `take()` will block. Call `store()` to save the trie before dropping.
pub(super) struct PreservedTrieGuard<'a>(parking_lot::MutexGuard<'a, Option<PreservedSparseTrie>>);
impl PreservedTrieGuard<'_> {
/// Stores a preserved trie for later reuse if the checkout is still current.
fn store(
&mut self,
checkout_id: u64,
trie: PreservedSparseTrie,
) -> PayloadSparseTrieStoreOutcome {
if checkout_id != self.state.latest_checkout_id {
debug!(
target: "engine::tree::payload_processor",
checkout_id,
latest_checkout_id = self.state.latest_checkout_id,
"Ignoring stale sparse trie checkout"
);
return PayloadSparseTrieStoreOutcome::IgnoredStaleCheckout;
}
self.state.preserved.replace(trie);
PayloadSparseTrieStoreOutcome::Stored
/// Stores a preserved trie for later reuse.
pub(super) fn store(&mut self, trie: PreservedSparseTrie) {
self.0.replace(trie);
}
}
/// A preserved sparse trie that can be reused across payload validations.
///
/// The trie exists in one of two states:
/// - **Anchored**: Has a computed state root and can be reused for payloads whose parent state
/// root matches the anchor.
/// - **Anchored**: Has a computed state root and can be reused for payloads whose parent state root
/// matches the anchor.
/// - **Cleared**: Trie data has been cleared but allocations are preserved for reuse.
#[derive(Debug)]
enum PreservedSparseTrie {
pub(super) enum PreservedSparseTrie {
/// Trie with a computed state root that can be reused for continuation payloads.
Anchored {
/// The sparse state trie (pruned after root computation).
@@ -286,17 +87,24 @@ enum PreservedSparseTrie {
impl PreservedSparseTrie {
/// Creates a new anchored preserved trie.
const fn anchored(trie: SparseTrie, state_root: B256) -> Self {
///
/// The `state_root` is the computed state root from the trie, which becomes the
/// anchor for determining if subsequent payloads can reuse this trie.
pub(super) const fn anchored(trie: SparseTrie, state_root: B256) -> Self {
Self::Anchored { trie, state_root }
}
/// Creates a cleared preserved trie (allocations preserved, data cleared).
const fn cleared(trie: SparseTrie) -> Self {
pub(super) const fn cleared(trie: SparseTrie) -> Self {
Self::Cleared { trie }
}
/// Consumes self and returns the trie for reuse.
fn into_trie_for(self, parent_state_root: B256) -> SparseTrie {
///
/// If the preserved trie is anchored and the parent state root matches, the pruned
/// trie structure is reused directly. Otherwise, the trie is cleared but allocations
/// are preserved to reduce memory overhead.
pub(super) fn into_trie_for(self, parent_state_root: B256) -> SparseTrie {
match self {
Self::Anchored { trie, state_root } if state_root == parent_state_root => {
debug!(
@@ -327,111 +135,3 @@ impl PreservedSparseTrie {
}
}
}
fn new_sparse_trie(kind: PayloadSparseTrieKind) -> SparseTrie {
let default_trie = match kind {
PayloadSparseTrieKind::HashMap => {
RevealableSparseTrie::blind_from(ConfigurableSparseTrie::HashMap(
ParallelSparseTrie::default()
.with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
))
}
PayloadSparseTrieKind::Arena => RevealableSparseTrie::blind_from(
ConfigurableSparseTrie::Arena(ArenaParallelSparseTrie::default()),
),
};
SparseStateTrie::default()
.with_accounts_trie(default_trie.clone())
.with_default_storage_trie(default_trie)
.with_updates(true)
}
fn prepare_cleared_trie(trie: &mut SparseTrie) {
trie.clear();
trie.shrink_to(SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY, SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn take_or_create_reuses_matching_anchor() {
let cache = PayloadSparseTrieCache::default();
let state_root = B256::with_last_byte(1);
assert_eq!(
cache.take_or_create_for(state_root).store_anchored(state_root),
PayloadSparseTrieStoreOutcome::Stored
);
match cache.state.lock().preserved.as_ref() {
Some(PreservedSparseTrie::Anchored { state_root: anchored, .. }) => {
assert_eq!(*anchored, state_root);
}
other => panic!("expected anchored trie, got {other:?}"),
}
}
#[test]
fn drop_restores_cleared_trie() {
let cache = PayloadSparseTrieCache::default();
let state_root = B256::with_last_byte(2);
let mut checkout = cache.take_or_create_for(state_root);
checkout.set_updates(true);
drop(checkout);
match cache.state.lock().preserved.as_ref() {
Some(PreservedSparseTrie::Cleared { .. }) => {}
other => panic!("expected cleared trie, got {other:?}"),
}
}
#[test]
fn stale_checkout_does_not_overwrite_newer_store() {
let cache = PayloadSparseTrieCache::default();
let parent_state_root = B256::with_last_byte(3);
let anchored_state_root = B256::with_last_byte(4);
let stale = cache.take_or_create_for(parent_state_root);
let fresh = cache.take_or_create_for(parent_state_root);
assert_eq!(
fresh.store_anchored(anchored_state_root),
PayloadSparseTrieStoreOutcome::Stored
);
assert_eq!(stale.store_cleared(), PayloadSparseTrieStoreOutcome::IgnoredStaleCheckout);
match cache.state.lock().preserved.as_ref() {
Some(PreservedSparseTrie::Anchored { state_root, .. }) => {
assert_eq!(*state_root, anchored_state_root);
}
other => panic!("expected anchored trie to survive stale checkout, got {other:?}"),
}
}
#[test]
fn stale_checkout_drop_does_not_overwrite_newer_store() {
let cache = PayloadSparseTrieCache::default();
let parent_state_root = B256::with_last_byte(5);
let anchored_state_root = B256::with_last_byte(6);
let stale = cache.take_or_create_for(parent_state_root);
let fresh = cache.take_or_create_for(parent_state_root);
assert_eq!(
fresh.store_anchored(anchored_state_root),
PayloadSparseTrieStoreOutcome::Stored
);
drop(stale);
match cache.state.lock().preserved.as_ref() {
Some(PreservedSparseTrie::Anchored { state_root, .. }) => {
assert_eq!(*state_root, anchored_state_root);
}
other => panic!("expected anchored trie to survive stale checkout drop, got {other:?}"),
}
}
}

View File

@@ -84,7 +84,7 @@ where
Evm: ConfigureEvm<Primitives = N> + 'static,
{
/// Initializes the task with the given transactions pending execution
pub(crate) fn new(
pub fn new(
executor: Runtime,
execution_cache: PayloadExecutionCache,
ctx: PrewarmContext<N, P, Evm>,

View File

@@ -7,7 +7,7 @@ use crate::tree::{
dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage,
DEFAULT_MAX_TARGETS_FOR_CHUNKING,
},
payload_processor::{multiproof::MultiProofTaskMetrics, SparseTrieCheckout},
payload_processor::multiproof::MultiProofTaskMetrics,
};
use alloy_primitives::B256;
use alloy_rlp::{Decodable, Encodable};
@@ -29,17 +29,17 @@ use reth_trie_parallel::{
#[cfg(feature = "trie-debug")]
use reth_trie_sparse::debug_recorder::TrieDebugRecorder;
use reth_trie_sparse::{
errors::SparseTrieResult, ConfigurableSparseTrie, DeferredDrops, LeafUpdate,
RevealableSparseTrie,
errors::SparseTrieResult, DeferredDrops, LeafUpdate, ParallelSparseTrie, RevealableSparseTrie,
SparseStateTrie, SparseTrie,
};
use revm_primitives::{hash_map::Entry, B256Map};
use tracing::{debug, debug_span, error, instrument, trace_span};
/// Maximum number of pending/prewarm updates that we accumulate in memory before actually applying.
const MAX_PENDING_UPDATES: usize = 100;
const MAX_PENDING_UPDATES: usize = 50;
/// Sparse trie task implementation that uses in-memory sparse trie data to schedule proof fetching.
pub(super) struct SparseTrieCacheTask {
pub(super) struct SparseTrieCacheTask<A = ParallelSparseTrie, S = ParallelSparseTrie> {
/// Sender for proof results.
proof_result_tx: CrossbeamSender<ProofResultMessage>,
/// Receiver for proof results directly from workers.
@@ -47,7 +47,7 @@ pub(super) struct SparseTrieCacheTask {
/// Receives updates from execution and prewarming.
updates: CrossbeamReceiver<SparseTrieTaskMessage>,
/// `SparseStateTrie` used for computing the state root.
trie: SparseTrieCheckout,
trie: SparseStateTrie<A, S>,
/// Handle to the proof worker pools (storage and account).
proof_worker_handle: ProofWorkerHandle,
@@ -110,14 +110,18 @@ pub(super) struct SparseTrieCacheTask {
metrics: MultiProofTaskMetrics,
}
impl SparseTrieCacheTask {
impl<A, S> SparseTrieCacheTask<A, S>
where
A: SparseTrie + Default,
S: SparseTrie + Default + Clone,
{
/// Creates a new sparse trie, pre-populating with an existing [`SparseStateTrie`].
pub(super) fn new_with_checkout(
pub(super) fn new_with_trie(
executor: &Runtime,
updates: CrossbeamReceiver<MultiProofMessage>,
proof_worker_handle: ProofWorkerHandle,
metrics: MultiProofTaskMetrics,
trie: SparseTrieCheckout,
trie: SparseStateTrie<A, S>,
chunk_size: usize,
) -> Self {
let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
@@ -201,7 +205,7 @@ impl SparseTrieCacheTask {
max_values_capacity: usize,
disable_pruning: bool,
updates: &TrieUpdates,
) -> (SparseTrieCheckout, DeferredDrops) {
) -> (SparseStateTrie<A, S>, DeferredDrops) {
let Self { mut trie, .. } = self;
trie.commit_updates(updates);
if !disable_pruning {
@@ -220,7 +224,7 @@ impl SparseTrieCacheTask {
self,
max_nodes_capacity: usize,
max_values_capacity: usize,
) -> (SparseTrieCheckout, DeferredDrops) {
) -> (SparseStateTrie<A, S>, DeferredDrops) {
let Self { mut trie, .. } = self;
trie.clear();
trie.shrink_to(max_nodes_capacity, max_values_capacity);
@@ -302,9 +306,9 @@ impl SparseTrieCacheTask {
self.promote_pending_account_updates()?;
self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
if self.finished_state_updates
&& self.account_updates.is_empty()
&& self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
if self.finished_state_updates &&
self.account_updates.is_empty() &&
self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
{
break;
}
@@ -378,13 +382,13 @@ impl SparseTrieCacheTask {
}
for (address, slots) in targets.storage_targets {
if !slots.is_empty() {
// Look up outer map once per address instead of once per slot.
let new_updates = self.new_storage_updates.entry(address).or_default();
for slot in slots {
// Only touch storages that are not yet present in the updates set.
new_updates.entry(slot.key()).or_insert(LeafUpdate::Touched);
}
for slot in slots {
// Only touch storages that are not yet present in the updates set.
self.new_storage_updates
.entry(address)
.or_default()
.entry(slot.key())
.or_insert(LeafUpdate::Touched);
}
// Touch corresponding account leaf to make sure its revealed in accounts trie for
@@ -401,26 +405,21 @@ impl SparseTrieCacheTask {
)]
fn on_hashed_state_update(&mut self, hashed_state_update: HashedPostState) {
for (address, storage) in hashed_state_update.storages {
if !storage.storage.is_empty() {
// Look up outer maps once per address instead of once per slot.
let new_updates = self.new_storage_updates.entry(address).or_default();
let mut existing_updates = self.storage_updates.get_mut(&address);
for (slot, value) in storage.storage {
self.trie.record_slot_touch(address, slot);
for (slot, value) in storage.storage {
self.trie.record_slot_touch(address, slot);
let encoded = if value.is_zero() {
Vec::new()
} else {
alloy_rlp::encode_fixed_size(&value).to_vec()
};
self.new_storage_updates
.entry(address)
.or_default()
.insert(slot, LeafUpdate::Changed(encoded));
let encoded = if value.is_zero() {
Vec::new()
} else {
alloy_rlp::encode_fixed_size(&value).to_vec()
};
new_updates.insert(slot, LeafUpdate::Changed(encoded));
// Remove an existing storage update if it exists.
if let Some(ref mut existing) = existing_updates {
existing.remove(&slot);
}
}
// Remove an existing storage update if it exists.
self.storage_updates.get_mut(&address).and_then(|updates| updates.remove(&slot));
}
// Make sure account is tracked in `account_updates` so that it is revealed in accounts
@@ -596,6 +595,7 @@ impl SparseTrieCacheTask {
Ok(updates_len_after < updates_len_before)
}
/// Computes storage roots for accounts whose storage updates are fully drained.
///
/// For each storage trie T that:
@@ -616,16 +616,16 @@ impl SparseTrieCacheTask {
.filter_map(|(address, updates)| updates.is_empty().then_some(*address))
.collect();
struct SendStorageTriePtr(*mut RevealableSparseTrie<ConfigurableSparseTrie>);
struct SendStorageTriePtr<S>(*mut RevealableSparseTrie<S>);
// SAFETY: this wrapper only forwards the pointer across rayon; deref invariants are
// documented at the use site below.
unsafe impl Send for SendStorageTriePtr {}
unsafe impl<S: Send> Send for SendStorageTriePtr<S> {}
let mut tries_to_compute_roots: Vec<(B256, SendStorageTriePtr)> =
let mut tries_to_compute_roots: Vec<(B256, SendStorageTriePtr<S>)> =
Vec::with_capacity(addresses_to_compute_roots.len());
for address in addresses_to_compute_roots {
if let Some(trie) = self.trie.storage_tries_mut().get_mut(&address)
&& !trie.is_root_cached()
if let Some(trie) = self.trie.storage_tries_mut().get_mut(&address) &&
!trie.is_root_cached()
{
tries_to_compute_roots.push((address, SendStorageTriePtr(trie)));
}
@@ -724,7 +724,7 @@ impl SparseTrieCacheTask {
// We need to keep iterating if any updates are being drained because that might
// indicate that more pending account updates can be promoted.
if num_promoted == 0 || !self.process_account_leaf_updates(false)? {
break;
break
}
}
@@ -845,6 +845,7 @@ pub struct StateRootComputeOutcome {
mod tests {
use super::*;
use alloy_primitives::{keccak256, Address, B256, U256};
use reth_trie_sparse::ParallelSparseTrie;
#[test]
fn test_run_hashing_task_hashed_state_update_forwards() {
@@ -867,7 +868,10 @@ mod tests {
let expected_state = hashed_state.clone();
let handle = std::thread::spawn(move || {
SparseTrieCacheTask::run_hashing_task(updates_rx, hashed_state_tx);
SparseTrieCacheTask::<ParallelSparseTrie, ParallelSparseTrie>::run_hashing_task(
updates_rx,
hashed_state_tx,
);
});
updates_tx.send(MultiProofMessage::HashedStateUpdate(hashed_state)).unwrap();

View File

@@ -4,7 +4,7 @@ use crate::tree::{
cached_state::{CacheStats, CachedStateProvider},
error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError},
instrumented_state::{InstrumentedStateProvider, StateProviderStats},
payload_processor::{EngineSharedCaches, PayloadProcessor},
payload_processor::PayloadProcessor,
precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap},
sparse_trie::StateRootComputeOutcome,
CacheWaitDurations, EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle,
@@ -190,13 +190,16 @@ where
validator: V,
config: TreeConfig,
invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
shared_caches: EngineSharedCaches<Evm>,
changeset_cache: ChangesetCache,
runtime: reth_tasks::Runtime,
) -> Self {
let precompile_cache_map = shared_caches.precompile_cache_map();
let payload_processor =
PayloadProcessor::new(runtime.clone(), evm_config.clone(), &config, shared_caches);
let precompile_cache_map = PrecompileCacheMap::default();
let payload_processor = PayloadProcessor::new(
runtime.clone(),
evm_config.clone(),
&config,
precompile_cache_map.clone(),
);
Self {
provider,
consensus,
@@ -310,7 +313,7 @@ where
// Validate block consensus rules which includes header validation
if let Err(consensus_err) = self.validate_block_inner(&block, None) {
// Header validation error takes precedence over execution error
return Err(InsertBlockError::new(block, consensus_err.into()).into());
return Err(InsertBlockError::new(block, consensus_err.into()).into())
}
// Also validate against the parent
@@ -318,7 +321,7 @@ where
self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
{
// Parent validation error takes precedence over execution error
return Err(InsertBlockError::new(block, consensus_err.into()).into());
return Err(InsertBlockError::new(block, consensus_err.into()).into())
}
// No header validation errors, return the original execution error
@@ -393,7 +396,7 @@ where
Ok(val) => val,
Err(e) => {
let block = convert_to_block(input)?;
return Err(InsertBlockError::new(block, e.into()).into());
return Err(InsertBlockError::new(block, e.into()).into())
}
}
};
@@ -426,7 +429,7 @@ where
convert_to_block(input)?,
ProviderError::HeaderNotFound(parent_hash.into()).into(),
)
.into());
.into())
};
let mut state_provider = ensure_ok!(provider_builder.build());
drop(_enter);
@@ -439,7 +442,7 @@ where
convert_to_block(input)?,
ProviderError::HeaderNotFound(parent_hash.into()).into(),
)
.into());
.into())
};
let evm_env = debug_span!(target: "engine::tree::payload_validator", "evm_env")
@@ -759,7 +762,7 @@ where
)
.into(),
)
.into());
.into())
}
let timing_stats = state_provider_stats.map(|stats| {
@@ -821,14 +824,14 @@ where
) -> Result<(), ConsensusError> {
if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
error!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {}: {e}", block.hash());
return Err(e);
return Err(e)
}
if let Err(e) =
self.consensus.validate_block_pre_execution_with_tx_root(block, transaction_root)
{
error!(target: "engine::tree::payload_validator", ?block, "Failed to validate block {}: {e}", block.hash());
return Err(e);
return Err(e)
}
Ok(())
@@ -1006,7 +1009,6 @@ where
let _enter = debug_span!(
target: "engine::tree",
"execute tx",
tx_index = senders.len() - 1,
)
.entered();
trace!(target: "engine::tree", "Executing transaction");
@@ -1320,7 +1322,7 @@ where
trace!(target: "engine::tree::payload_validator", block=?block.num_hash(), "Validating block consensus");
// validate block consensus rules
if let Err(e) = self.validate_block_inner(block, transaction_root) {
return Err(e.into());
return Err(e.into())
}
// now validate against the parent
@@ -1329,7 +1331,7 @@ where
self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
{
warn!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {} against parent: {e}", block.hash());
return Err(e.into());
return Err(e.into())
}
drop(_enter);
@@ -1342,7 +1344,7 @@ where
{
// call post-block hook
self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
return Err(err.into());
return Err(err.into())
}
drop(_enter);
@@ -1358,7 +1360,7 @@ where
{
// call post-block hook
self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
return Err(err.into());
return Err(err.into())
}
// record post-execution validation duration
@@ -1466,7 +1468,7 @@ where
self.provider.clone(),
historical,
Some(blocks),
)));
)))
}
// Check if the block is persisted
@@ -1474,7 +1476,7 @@ where
debug!(target: "engine::tree::payload_validator", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
// For persisted blocks, we create a builder that will fetch state directly from the
// database
return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)));
return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
}
debug!(target: "engine::tree::payload_validator", %hash, "no canonical state found for block");
@@ -1506,7 +1508,7 @@ where
) {
if state.invalid_headers.get(&block.hash()).is_some() {
// we already marked this block as invalid
return;
return
}
self.invalid_block_hook.on_invalid_block(parent_header, block, output, trie_updates);
}

View File

@@ -169,11 +169,11 @@ where
}
fn call(&self, input: PrecompileInput<'_>) -> PrecompileResult {
if let Some(entry) = &self.cache.get(input.data, self.spec_id.clone()) &&
input.gas >= entry.gas_used()
{
if let Some(entry) = &self.cache.get(input.data, self.spec_id.clone()) {
self.increment_by_one_precompile_cache_hits();
return entry.to_precompile_result()
if input.gas >= entry.gas_used() {
return entry.to_precompile_result()
}
}
let calldata = input.data;

View File

@@ -203,7 +203,6 @@ impl TestHarness {
payload_validator,
TreeConfig::default(),
Box::new(NoopInvalidBlockHook::default()),
EngineSharedCaches::default(),
changeset_cache.clone(),
reth_tasks::Runtime::test(),
);
@@ -408,7 +407,6 @@ impl ValidatorTestHarness {
payload_validator,
TreeConfig::default(),
Box::new(NoopInvalidBlockHook::default()),
EngineSharedCaches::default(),
changeset_cache,
reth_tasks::Runtime::test(),
);

View File

@@ -1,27 +0,0 @@
//! SDK smoke tests for `EngineSharedCaches`.
use alloy_primitives::B256;
use reth_engine_tree::tree::{
EngineSharedCaches, PayloadSparseTrieKind, PayloadSparseTrieStoreOutcome,
};
use reth_evm_ethereum::EthEvmConfig;
#[test]
fn engine_shared_caches_exposes_public_sparse_trie_sdk() {
let caches =
EngineSharedCaches::<EthEvmConfig>::with_sparse_trie_kind(PayloadSparseTrieKind::Arena);
let _precompile_cache_map = caches.precompile_cache_map();
let sparse_trie_cache = caches.sparse_trie_cache();
assert_eq!(sparse_trie_cache.kind(), PayloadSparseTrieKind::Arena);
let state_root = B256::with_last_byte(1);
assert_eq!(
sparse_trie_cache.take_or_create_for(state_root).store_anchored(state_root),
PayloadSparseTrieStoreOutcome::Stored
);
let checkout = sparse_trie_cache.take_or_create_for(state_root);
assert!(checkout.memory_size() > 0 || checkout.retained_storage_tries_count() == 0);
}

View File

@@ -4,7 +4,6 @@ use eyre::{eyre, OptionExt};
use futures_util::{stream::StreamExt, Stream, TryStreamExt};
use reqwest::{Client, IntoUrl, Url};
use reth_era::common::file_ops::EraFileType;
use reth_fs_util::FsPathError;
use sha2::{Digest, Sha256};
use std::{future::Future, path::Path, str::FromStr};
use tokio::{
@@ -137,7 +136,7 @@ impl<Http: HttpClient + Clone> EraClient<Http> {
let Some(number) = self.file_name_to_number(name) &&
(number < index || number >= last)
{
remove_file_ignore_not_found(entry.path())?;
reth_fs_util::remove_file(entry.path())?;
}
}
}
@@ -322,16 +321,6 @@ impl<Http: HttpClient + Clone> EraClient<Http> {
}
}
fn remove_file_ignore_not_found(path: impl AsRef<Path>) -> eyre::Result<()> {
match reth_fs_util::remove_file(path) {
Ok(()) => Ok(()),
Err(FsPathError::RemoveFile { source, .. }) if source.kind() == io::ErrorKind::NotFound => {
Ok(())
}
Err(err) => Err(err.into()),
}
}
async fn checksum(mut reader: impl AsyncRead + Unpin) -> eyre::Result<Vec<u8>> {
let mut hasher = Sha256::new();
@@ -378,25 +367,4 @@ mod tests {
assert_eq!(actual_number, expected_number);
}
#[test]
fn test_remove_file_ignore_not_found() {
let temp_dir = tempfile::tempdir().unwrap();
let path = temp_dir.path().join("missing.era1");
assert!(remove_file_ignore_not_found(&path).is_ok());
}
#[test]
fn test_remove_file_ignore_not_found_preserves_other_errors() {
let temp_dir = tempfile::tempdir().unwrap();
let path = temp_dir.path().join("dir");
std::fs::create_dir_all(&path).unwrap();
let err = remove_file_ignore_not_found(&path).unwrap_err();
assert!(matches!(
err.downcast_ref::<FsPathError>(),
Some(FsPathError::RemoveFile { source, .. }) if source.kind() != io::ErrorKind::NotFound
));
}
}

View File

@@ -36,6 +36,8 @@ tracing.workspace = true
tempfile.workspace = true
[features]
default = []
otlp = ["reth-tracing/otlp", "reth-node-core/otlp"]
otlp-logs = ["reth-tracing/otlp-logs", "reth-node-core/otlp-logs"]
@@ -87,3 +89,6 @@ min-trace-logs = [
"tracing/release_max_level_trace",
"reth-node-core/min-trace-logs",
]
rocksdb = ["reth-cli-commands/rocksdb"]
edge = ["rocksdb"]

View File

@@ -78,43 +78,3 @@ async fn test_simulate_v1_with_max_fee_per_blob_gas_only() -> eyre::Result<()> {
Ok(())
}
#[tokio::test]
async fn test_simulate_v1_too_many_blocks_error() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let chain_spec = Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(serde_json::from_str(include_str!("../assets/genesis.json")).unwrap())
.cancun_activated()
.build(),
);
let (mut nodes, wallet) = setup_engine::<EthereumNode>(
1,
chain_spec,
false,
Default::default(),
eth_payload_attributes,
)
.await?;
let node = nodes.pop().unwrap();
let provider = ProviderBuilder::new()
.wallet(EthereumWallet::new(wallet.wallet_gen().swap_remove(0)))
.connect_http(node.rpc_url());
let payload: SimulatePayload<TransactionRequest> =
(0..257).fold(SimulatePayload::default(), |payload, _| payload.extend(SimBlock::default()));
let err = provider
.raw_request::<_, Vec<SimulatedBlock>>("eth_simulateV1".into(), (&payload, "latest"))
.await
.unwrap_err();
let err = err.as_error_resp().expect("expected JSON-RPC error response");
assert_eq!(err.code, -38026);
assert_eq!(err.message, "too many blocks");
Ok(())
}

View File

@@ -85,7 +85,4 @@ serde = [
"alloy-rpc-types-eth?/serde",
"rand/serde",
]
rpc = [
"dep:alloy-rpc-types-eth",
"alloy-rpc-types-eth?/serde",
]
rpc = ["dep:alloy-rpc-types-eth"]

View File

@@ -66,6 +66,8 @@ secp256k1.workspace = true
tempfile.workspace = true
[features]
default = []
edge = ["reth-provider/edge"]
serde = [
"reth-exex-types/serde",
"reth-revm/serde",

View File

@@ -3,7 +3,7 @@ use alloy_eips::BlockHashOrNumber;
use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256};
use async_compression::tokio::bufread::GzipDecoder;
use futures::Future;
use itertools::{Either, Itertools};
use itertools::Either;
use reth_consensus::{Consensus, ConsensusError};
use reth_network_p2p::{
bodies::client::{BodiesClient, BodiesFut},
@@ -163,9 +163,17 @@ impl<B: FullBlock> FileClient<B> {
if self.headers.is_empty() {
return true
}
let (min, max) = self.headers.keys().minmax().into_option().expect("not empty");
// Contiguous range from min to max means no gaps
*max - *min + 1 == self.headers.len() as u64
let mut nums = self.headers.keys().copied().collect::<Vec<_>>();
nums.sort_unstable();
let mut iter = nums.into_iter();
let mut lowest = iter.next().expect("not empty");
for next in iter {
if next != lowest + 1 {
return false
}
lowest = next;
}
true
}
/// Use the provided bodies as the file client's block body buffer.

View File

@@ -63,28 +63,6 @@ impl EthStreamError {
}
}
/// Returns whether this error indicates a protocol breach on the receive side.
///
/// These are errors caused by the remote peer sending invalid or malformed data
/// that warrant disconnecting with [`DisconnectReason::ProtocolBreach`].
pub const fn is_protocol_breach(&self) -> bool {
matches!(
self,
Self::InvalidMessage(_) |
Self::MessageTooBig(_) |
Self::TransactionHashesInvalidLenOfFields { .. } |
Self::UnsupportedMessage { .. } |
Self::P2PStreamError(
P2PStreamError::Rlp(_) |
P2PStreamError::Snap(_) |
P2PStreamError::MessageTooBig { .. } |
P2PStreamError::UnknownReservedMessageId(_) |
P2PStreamError::EmptyProtocolMessage |
P2PStreamError::UnknownDisconnectReason(_)
)
)
}
/// Returns the [`io::Error`] if it was caused by IO
pub const fn as_io(&self) -> Option<&io::Error> {
if let Self::P2PStreamError(P2PStreamError::Io(io)) = self {

View File

@@ -21,7 +21,7 @@ alloy-eip2124.workspace = true
# misc
serde = { workspace = true, optional = true }
humantime-serde = { workspace = true, optional = true }
serde_json = { workspace = true, features = ["std"], optional = true }
serde_json = { workspace = true, features = ["std"] }
# misc
tracing.workspace = true
@@ -30,7 +30,6 @@ tracing.workspace = true
serde = [
"dep:serde",
"dep:humantime-serde",
"dep:serde_json",
"alloy-eip2124/serde",
]
test-utils = []

View File

@@ -1,9 +1,15 @@
//! Configuration for peering.
use std::{collections::HashSet, time::Duration};
use std::{
collections::HashSet,
io::{self, ErrorKind},
path::Path,
time::Duration,
};
use reth_net_banlist::{BanList, IpFilter};
use reth_network_peers::{NodeRecord, TrustedPeer};
use tracing::info;
use crate::{peers::PersistedPeerInfo, BackoffKind, ReputationChangeWeights};
@@ -305,16 +311,16 @@ impl PeersConfig {
#[cfg(feature = "serde")]
pub fn with_basic_nodes_from_file(
mut self,
optional_file: Option<impl AsRef<std::path::Path>>,
) -> Result<Self, std::io::Error> {
optional_file: Option<impl AsRef<Path>>,
) -> Result<Self, io::Error> {
let Some(file_path) = optional_file else { return Ok(self) };
let raw = match std::fs::read_to_string(file_path.as_ref()) {
Ok(contents) => contents,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(self),
Err(e) if e.kind() == ErrorKind::NotFound => return Ok(self),
Err(e) => return Err(e),
};
tracing::info!(target: "net::peers", file = %file_path.as_ref().display(), "Loading saved peers");
info!(target: "net::peers", file = %file_path.as_ref().display(), "Loading saved peers");
// Try the new format first, fall back to legacy Vec<NodeRecord>
let peers: Vec<PersistedPeerInfo> = serde_json::from_str(&raw)
@@ -324,9 +330,9 @@ impl PeersConfig {
nodes.into_iter().map(PersistedPeerInfo::from_node_record).collect(),
)
})
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
tracing::info!(target: "net::peers", count = peers.len(), "Loaded persisted peers");
info!(target: "net::peers", count = peers.len(), "Loaded persisted peers");
self.persisted_peers = peers;
Ok(self)
}

View File

@@ -742,9 +742,7 @@ impl<N: NetworkPrimitives> Future for ActiveSession<N> {
}
OnIncomingMessageOutcome::BadMessage { error, message } => {
debug!(target: "net::session", %error, msg=?message, remote_peer_id=?this.remote_peer_id, "received invalid protocol message");
this.on_bad_message();
return this
.try_disconnect(DisconnectReason::ProtocolBreach, cx)
return this.close_on_error(error, cx)
}
OnIncomingMessageOutcome::NoCapacity(msg) => {
// failed to send due to lack of capacity
@@ -754,10 +752,6 @@ impl<N: NetworkPrimitives> Future for ActiveSession<N> {
}
Err(err) => {
debug!(target: "net::session", %err, remote_peer_id=?this.remote_peer_id, "failed to receive message");
if err.is_protocol_breach() {
this.on_bad_message();
return this.try_disconnect(DisconnectReason::ProtocolBreach, cx)
}
return this.close_on_error(err, cx)
}
}
@@ -972,7 +966,6 @@ mod tests {
GetBlockBodies, HelloMessageWithProtocols, P2PStream, StatusBuilder, UnauthedEthStream,
UnauthedP2PStream, UnifiedStatus,
};
use reth_eth_wire_types::{EthMessageID, RawCapabilityMessage};
use reth_ethereum_forks::EthereumHardfork;
use reth_network_peers::pk2id;
use reth_network_types::session::config::PROTOCOL_BREACH_REQUEST_TIMEOUT;
@@ -1168,40 +1161,6 @@ mod tests {
fut.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_invalid_message_disconnects_with_protocol_breach() {
let mut builder = SessionBuilder::default();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let local_addr = listener.local_addr().unwrap();
let fut = builder.with_client_stream(local_addr, async move |mut client_stream| {
client_stream
.start_send_raw(RawCapabilityMessage::eth(
EthMessageID::PooledTransactions,
vec![0xc0].into(),
))
.unwrap();
client_stream.flush().await.unwrap();
let msg = client_stream.next().await.unwrap().unwrap_err();
assert_eq!(msg.as_disconnected(), Some(DisconnectReason::ProtocolBreach));
});
let (tx, rx) = oneshot::channel();
tokio::task::spawn(async move {
let (incoming, _) = listener.accept().await.unwrap();
let session = builder.connect_incoming(incoming).await;
session.await;
tx.send(()).unwrap();
});
fut.await;
rx.await.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn handle_dropped_stream() {
let mut builder = SessionBuilder::default();

View File

@@ -442,12 +442,8 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
search_durations.find_idle_peer
);
// peer may have disconnected between idle check and here, re-buffer hashes so they
// aren't lost from the pending fetch cache
let Some(peer) = peers.get(&peer_id) else {
self.buffer_hashes(hashes_to_request, None);
return false
};
// peer should always exist since `is_session_active` already checked
let Some(peer) = peers.get(&peer_id) else { return false };
let conn_eth_version = peer.version;
// fill the request with more hashes pending fetch that have been announced by the peer.
@@ -1453,26 +1449,6 @@ mod test {
)
}
#[test]
fn on_fetch_pending_hashes_rebuffers_on_disconnected_peer() {
let tx_fetcher = &mut TransactionFetcher::default();
let peer_1 = PeerId::new([1; 64]);
let peer_2 = PeerId::new([2; 64]);
let hash_1 = B256::from_slice(&[1; 32]);
buffer_hash_to_tx_fetcher(tx_fetcher, hash_1, peer_1, 0, Some(128));
buffer_hash_to_tx_fetcher(tx_fetcher, hash_1, peer_2, 0, Some(128));
assert_eq!(tx_fetcher.num_pending_hashes(), 1);
// pass empty peers map — both peers are "disconnected"
let peers = HashMap::new();
tx_fetcher.on_fetch_pending_hashes(&peers, |_| true);
// hash should be re-buffered, not lost
assert_eq!(tx_fetcher.num_pending_hashes(), 1);
}
#[test]
fn verify_response_hashes() {
let input = hex!(

View File

@@ -1,7 +1,6 @@
//! Transactions management for the p2p network.
use alloy_consensus::transaction::TxHashRef;
use itertools::Itertools;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
/// Aggregation on configurable parameters for [`TransactionsManager`].
@@ -460,14 +459,6 @@ impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
}
/// Handles a closed peer session, removing the peer from transaction-local tracking state.
fn on_peer_session_closed(&mut self, peer_id: &PeerId) {
if let Some(mut peer) = self.peers.remove(peer_id) {
self.policies.propagation_policy_mut().on_session_closed(&mut peer);
}
self.transaction_fetcher.remove_peer(peer_id);
}
/// Clear the transaction
fn on_good_import(&mut self, hash: TxHash) {
self.transactions_by_peers.remove(&hash);
@@ -767,7 +758,7 @@ impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
trace!(target: "net::tx::propagation",
peer_id=format!("{peer_id:#}"),
hashes_len=valid_announcement_data.len(),
hashes=%valid_announcement_data.keys().format(", "),
hashes=?valid_announcement_data.keys().collect::<Vec<_>>(),
msg_version=%valid_announcement_data.msg_version(),
client_version=%client,
"received previously unseen and pending hashes in announcement from peer"
@@ -1254,7 +1245,13 @@ where
fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
match event_result {
NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
self.on_peer_session_closed(&peer_id);
// remove the peer
let peer = self.peers.remove(&peer_id);
if let Some(mut peer) = peer {
self.policies.propagation_policy_mut().on_session_closed(&mut peer);
}
self.transaction_fetcher.remove_peer(&peer_id);
}
NetworkEvent::ActivePeerSession { info, messages } => {
// process active peer session and broadcast available transaction from the pool
@@ -2169,7 +2166,7 @@ mod tests {
NetworkConfigBuilder, NetworkManager,
};
use alloy_consensus::{TxEip1559, TxLegacy};
use alloy_primitives::{hex, Signature, TxKind, B256, U256};
use alloy_primitives::{hex, Signature, TxKind, U256};
use alloy_rlp::Decodable;
use futures::FutureExt;
use reth_chainspec::MIN_TRANSACTION_GAS;
@@ -2513,46 +2510,6 @@ mod tests {
handle.terminate().await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_session_closed_cleans_transaction_peer_state() {
let (mut tx_manager, _network) = new_tx_manager().await;
let peer_id = PeerId::new([1; 64]);
let fallback_peer = PeerId::new([2; 64]);
let (peer, _) = new_mock_session(peer_id, EthVersion::Eth66);
let hash_shared = B256::from_slice(&[1; 32]);
tx_manager.peers.insert(peer_id, peer);
buffer_hash_to_tx_fetcher(
&mut tx_manager.transaction_fetcher,
hash_shared,
peer_id,
0,
None,
);
buffer_hash_to_tx_fetcher(
&mut tx_manager.transaction_fetcher,
hash_shared,
fallback_peer,
0,
None,
);
tx_manager.transaction_fetcher.active_peers.insert(peer_id, 1);
tx_manager.on_network_event(NetworkEvent::Peer(PeerEvent::SessionClosed {
peer_id,
reason: None,
}));
// peer removed from peers map and active_peers
assert!(!tx_manager.peers.contains_key(&peer_id));
assert!(tx_manager.transaction_fetcher.active_peers.peek(&peer_id).is_none());
// fallback peer is still available for the hash
assert_eq!(
tx_manager.transaction_fetcher.get_idle_peer_for(hash_shared),
Some(&fallback_peer)
);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_on_get_pooled_transactions_network() {
reth_tracing::init_test_tracing();

View File

@@ -32,7 +32,7 @@ use reth_node_core::{
primitives::Head,
};
use reth_provider::{
providers::{BlockchainProvider, NodeTypesForProvider, RocksDBProvider},
providers::{BlockchainProvider, NodeTypesForProvider},
ChainSpecProvider, FullProvider,
};
use reth_tasks::TaskExecutor;
@@ -154,14 +154,12 @@ pub struct NodeBuilder<DB, ChainSpec> {
config: NodeConfig<ChainSpec>,
/// The configured database for the node.
database: DB,
/// An optional [`RocksDBProvider`] to use instead of creating one during launch.
rocksdb_provider: Option<RocksDBProvider>,
}
impl<ChainSpec> NodeBuilder<(), ChainSpec> {
/// Create a new [`NodeBuilder`].
pub const fn new(config: NodeConfig<ChainSpec>) -> Self {
Self { config, database: (), rocksdb_provider: None }
Self { config, database: () }
}
}
@@ -230,13 +228,7 @@ impl<DB, ChainSpec> NodeBuilder<DB, ChainSpec> {
impl<DB, ChainSpec: EthChainSpec> NodeBuilder<DB, ChainSpec> {
/// Configures the underlying database that the node will use.
pub fn with_database<D>(self, database: D) -> NodeBuilder<D, ChainSpec> {
NodeBuilder { config: self.config, database, rocksdb_provider: self.rocksdb_provider }
}
/// Sets the [`RocksDBProvider`] to use instead of creating one during launch.
pub fn with_rocksdb_provider(mut self, rocksdb_provider: RocksDBProvider) -> Self {
self.rocksdb_provider = Some(rocksdb_provider);
self
NodeBuilder { config: self.config, database }
}
/// Preconfigure the builder with the context to launch the node.
@@ -305,7 +297,7 @@ where
T: NodeTypesForProvider<ChainSpec = ChainSpec>,
P: FullProvider<NodeTypesWithDBAdapter<T, DB>>,
{
NodeBuilderWithTypes::new(self.config, self.database, self.rocksdb_provider)
NodeBuilderWithTypes::new(self.config, self.database)
}
/// Preconfigures the node with a specific node implementation.
@@ -355,12 +347,6 @@ where
DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
ChainSpec: EthChainSpec + EthereumHardforks,
{
/// Sets the [`RocksDBProvider`] to use instead of creating one during launch.
pub fn with_rocksdb_provider(mut self, rocksdb_provider: RocksDBProvider) -> Self {
self.builder.rocksdb_provider = Some(rocksdb_provider);
self
}
/// Configures the types of the node.
pub fn with_types<T>(self) -> WithLaunchContext<NodeBuilderWithTypes<RethFullAdapter<DB, T>>>
where

View File

@@ -16,7 +16,6 @@ use crate::{
use reth_exex::ExExContext;
use reth_node_api::{FullNodeComponents, FullNodeTypes, NodeAddOns, NodeTypes};
use reth_node_core::node_config::NodeConfig;
use reth_provider::providers::RocksDBProvider;
use reth_tasks::TaskExecutor;
use std::{fmt, fmt::Debug, future::Future};
@@ -26,8 +25,6 @@ pub struct NodeBuilderWithTypes<T: FullNodeTypes> {
config: NodeConfig<<T::Types as NodeTypes>::ChainSpec>,
/// The configured database for the node.
adapter: NodeTypesAdapter<T>,
/// An optional [`RocksDBProvider`] to use instead of creating one during launch.
rocksdb_provider: Option<RocksDBProvider>,
}
impl<T: FullNodeTypes> NodeBuilderWithTypes<T> {
@@ -35,9 +32,8 @@ impl<T: FullNodeTypes> NodeBuilderWithTypes<T> {
pub const fn new(
config: NodeConfig<<T::Types as NodeTypes>::ChainSpec>,
database: T::DB,
rocksdb_provider: Option<RocksDBProvider>,
) -> Self {
Self { config, adapter: NodeTypesAdapter::new(database), rocksdb_provider }
Self { config, adapter: NodeTypesAdapter::new(database) }
}
/// Advances the state of the node builder to the next state where all components are configured
@@ -45,12 +41,11 @@ impl<T: FullNodeTypes> NodeBuilderWithTypes<T> {
where
CB: NodeComponentsBuilder<T>,
{
let Self { config, adapter, rocksdb_provider } = self;
let Self { config, adapter } = self;
NodeBuilderWithComponents {
config,
adapter,
rocksdb_provider,
components_builder,
add_ons: AddOns { hooks: NodeHooks::default(), exexs: Vec::new(), add_ons: () },
}
@@ -155,8 +150,6 @@ pub struct NodeBuilderWithComponents<
pub config: NodeConfig<<T::Types as NodeTypes>::ChainSpec>,
/// Adapter for the underlying node types and database
pub adapter: NodeTypesAdapter<T>,
/// An optional [`RocksDBProvider`] to use instead of creating one during launch.
pub rocksdb_provider: Option<RocksDBProvider>,
/// container for type specific components
pub components_builder: CB,
/// Additional node extensions.
@@ -174,12 +167,11 @@ where
where
AO: NodeAddOns<NodeAdapter<T, CB::Components>>,
{
let Self { config, adapter, rocksdb_provider, components_builder, .. } = self;
let Self { config, adapter, components_builder, .. } = self;
NodeBuilderWithComponents {
config,
adapter,
rocksdb_provider,
components_builder,
add_ons: AddOns { hooks: NodeHooks::default(), exexs: Vec::new(), add_ons },
}

View File

@@ -472,7 +472,6 @@ where
pub async fn create_provider_factory<N, Evm>(
&self,
changeset_cache: ChangesetCache,
rocksdb_provider: Option<RocksDBProvider>,
) -> eyre::Result<ProviderFactory<N>>
where
N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
@@ -490,16 +489,12 @@ where
.with_genesis_block_number(self.chain_spec().genesis().number.unwrap_or_default())
.build()?;
// Use the provided RocksDB provider or create a new one
let rocksdb_provider = if let Some(provider) = rocksdb_provider {
provider
} else {
RocksDBProvider::builder(self.data_dir().rocksdb())
.with_default_tables()
.with_metrics()
.with_statistics()
.build()?
};
// Initialize RocksDB provider with metrics, statistics, and default tables
let rocksdb_provider = RocksDBProvider::builder(self.data_dir().rocksdb())
.with_default_tables()
.with_metrics()
.with_statistics()
.build()?;
let factory = ProviderFactory::new(
self.right().clone(),
@@ -578,14 +573,12 @@ where
pub async fn with_provider_factory<N, Evm>(
self,
changeset_cache: ChangesetCache,
rocksdb_provider: Option<RocksDBProvider>,
) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
where
N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
{
let factory =
self.create_provider_factory::<N, Evm>(changeset_cache, rocksdb_provider).await?;
let factory = self.create_provider_factory::<N, Evm>(changeset_cache).await?;
let ctx = LaunchContextWith {
inner: self.inner,
attachment: self.attachment.map_right(|_| factory),

View File

@@ -15,7 +15,7 @@ use reth_engine_tree::{
chain::{ChainEvent, FromOrchestrator},
engine::{EngineApiKind, EngineApiRequest, EngineRequestHandler},
launch::build_engine_orchestrator,
tree::{EngineSharedCaches, PayloadSparseTrieKind, TreeConfig},
tree::TreeConfig,
};
use reth_engine_util::EngineMessageStreamExt;
use reth_exex::ExExManagerHandle;
@@ -81,7 +81,6 @@ impl EngineNodeLauncher {
let Self { ctx, engine_tree_config } = self;
let NodeBuilderWithComponents {
adapter: NodeTypesAdapter { database },
rocksdb_provider,
components_builder,
add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
config,
@@ -90,10 +89,6 @@ impl EngineNodeLauncher {
// Create changeset cache that will be shared across the engine
let changeset_cache = ChangesetCache::new();
let main_shared_caches =
EngineSharedCaches::<<CB::Components as NodeComponents<T>>::Evm>::with_sparse_trie_kind(
PayloadSparseTrieKind::from(engine_tree_config.enable_arena_sparse_trie()),
);
// setup the launch context
let ctx = ctx
@@ -107,7 +102,7 @@ impl EngineNodeLauncher {
// ensure certain settings take effect
.with_adjusted_configs()
// Create the provider factory with changeset cache
.with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>(changeset_cache.clone(), rocksdb_provider).await?
.with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>(changeset_cache.clone()).await?
.inspect(|_| {
info!(target: "reth::cli", "Database opened");
})
@@ -127,8 +122,7 @@ impl EngineNodeLauncher {
.with_blockchain_db::<T, _>(move |provider_factory| {
Ok(BlockchainProvider::new(provider_factory)?)
})?
.with_components(components_builder, on_component_initialized)
.await?;
.with_components(components_builder, on_component_initialized).await?;
// spawn exexs if any
let maybe_exex_manager_handle = ctx.launch_exex(installed_exex).await?;
@@ -199,12 +193,7 @@ impl EngineNodeLauncher {
// Build the engine validator with all required components
let engine_validator = validator_builder
.clone()
.build_tree_validator_with_caches(
&add_ons_ctx,
engine_tree_config.clone(),
changeset_cache.clone(),
main_shared_caches.clone(),
)
.build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), changeset_cache.clone())
.await?;
// Create the consensus engine stream with optional reorg
@@ -217,18 +206,8 @@ impl EngineNodeLauncher {
|| async {
// Create a separate cache for reorg validator (not shared with main engine)
let reorg_cache = ChangesetCache::new();
let reorg_shared_caches = EngineSharedCaches::<
<CB::Components as NodeComponents<T>>::Evm,
>::with_sparse_trie_kind(
PayloadSparseTrieKind::from(engine_tree_config.enable_arena_sparse_trie()),
);
validator_builder
.build_tree_validator_with_caches(
&add_ons_ctx,
engine_tree_config.clone(),
reorg_cache,
reorg_shared_caches,
)
.build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), reorg_cache)
.await
},
node_config.debug.reorg_frequency,
@@ -437,7 +416,10 @@ impl EngineNodeLauncher {
ctx.spawn_ethstats(engine_events_for_ethstats).await?;
let handle = NodeHandle {
node_exit_future: NodeExitFuture::new(async { rx.await? }),
node_exit_future: NodeExitFuture::new(
async { rx.await? },
full_node.config.debug.terminate,
),
node: full_node,
};

View File

@@ -1,8 +1,8 @@
//! Builder support for rpc components.
pub use jsonrpsee::server::middleware::rpc::{RpcService, RpcServiceBuilder};
use reth_engine_tree::tree::WaitForCaches;
pub use reth_engine_tree::tree::{BasicEngineValidator, EngineValidator};
use reth_engine_tree::tree::{EngineSharedCaches, PayloadSparseTrieKind, WaitForCaches};
pub use reth_rpc_builder::{middleware::RethRpcMiddleware, Identity, Stack};
pub use reth_trie_db::ChangesetCache;
@@ -981,8 +981,7 @@ where
let Self { eth_api_builder, engine_api_builder, hooks, .. } = self;
let engine_api = engine_api_builder.build_engine_api(&ctx).await?;
let AddOnsContext { node, config, beacon_engine_handle, jwt_secret, engine_events, .. } =
ctx;
let AddOnsContext { node, config, beacon_engine_handle, jwt_secret, engine_events } = ctx;
info!(target: "reth::cli", "Engine API handler initialized");
@@ -1295,25 +1294,6 @@ pub trait EngineValidatorBuilder<Node: FullNodeComponents>: Send + Sync + Clone
tree_config: TreeConfig,
changeset_cache: ChangesetCache,
) -> impl Future<Output = eyre::Result<Self::EngineValidator>> + Send;
/// Builds the tree validator using the shared cache handles exported by the launcher.
///
/// The default implementation preserves the legacy behavior and ignores the provided caches.
fn build_tree_validator_with_caches(
self,
ctx: &AddOnsContext<'_, Node>,
tree_config: TreeConfig,
changeset_cache: ChangesetCache,
shared_caches: EngineSharedCaches<Node::Evm>,
) -> impl Future<Output = eyre::Result<Self::EngineValidator>> + Send
where
Self: Sized,
{
async move {
let _ = shared_caches;
self.build_tree_validator(ctx, tree_config, changeset_cache).await
}
}
}
/// Basic implementation of [`EngineValidatorBuilder`].
@@ -1361,20 +1341,6 @@ where
ctx: &AddOnsContext<'_, Node>,
tree_config: TreeConfig,
changeset_cache: ChangesetCache,
) -> eyre::Result<Self::EngineValidator> {
let shared_caches = EngineSharedCaches::with_sparse_trie_kind(PayloadSparseTrieKind::from(
tree_config.enable_arena_sparse_trie(),
));
self.build_tree_validator_with_caches(ctx, tree_config, changeset_cache, shared_caches)
.await
}
async fn build_tree_validator_with_caches(
self,
ctx: &AddOnsContext<'_, Node>,
tree_config: TreeConfig,
changeset_cache: ChangesetCache,
shared_caches: EngineSharedCaches<Node::Evm>,
) -> eyre::Result<Self::EngineValidator> {
let validator = self.payload_validator_builder.build(ctx).await?;
let data_dir = ctx.config.datadir.clone().resolve_datadir(ctx.config.chain.chain());
@@ -1387,7 +1353,6 @@ where
validator,
tree_config,
invalid_block_hook,
shared_caches,
changeset_cache,
ctx.node.task_executor().clone(),
))

View File

@@ -93,6 +93,12 @@ min-trace-logs = ["tracing/release_max_level_trace"]
# Debug recording for sparse trie mutations
trie-debug = ["reth-engine-primitives/trie-debug"]
# Route supported tables to RocksDB instead of MDBX
rocksdb = ["reth-storage-api/rocksdb"]
# Marker feature for edge/unstable builds - enables rocksdb and sets v2 defaults
edge = ["rocksdb"]
[build-dependencies]
vergen = { workspace = true, features = ["build", "cargo", "emit_and_set"] }
vergen-git2.workspace = true

View File

@@ -383,11 +383,6 @@ pub struct EngineArgs {
#[arg(long = "engine.disable-sparse-trie-cache-pruning", default_value_t = DefaultEngineValues::get_global().disable_sparse_trie_cache_pruning)]
pub disable_sparse_trie_cache_pruning: bool,
/// Enable the arena-based sparse trie implementation instead of the default hash-map-based
/// one.
#[arg(long = "engine.enable-arena-sparse-trie", default_value_t = false)]
pub enable_arena_sparse_trie: bool,
/// Configure the timeout for the state root task before spawning a sequential fallback.
/// If the state root task takes longer than this, a sequential computation starts in
/// parallel and whichever finishes first is used.
@@ -474,7 +469,6 @@ impl Default for EngineArgs {
sparse_trie_max_hot_accounts,
slow_block_threshold,
disable_sparse_trie_cache_pruning,
enable_arena_sparse_trie: false,
state_root_task_timeout: state_root_task_timeout
.as_deref()
.map(|s| humantime::parse_duration(s).expect("valid default duration")),
@@ -509,7 +503,6 @@ impl EngineArgs {
.with_sparse_trie_max_hot_accounts(self.sparse_trie_max_hot_accounts)
.with_slow_block_threshold(self.slow_block_threshold)
.with_disable_sparse_trie_cache_pruning(self.disable_sparse_trie_cache_pruning)
.with_enable_arena_sparse_trie(self.enable_arena_sparse_trie)
.with_state_root_task_timeout(self.state_root_task_timeout.filter(|d| !d.is_zero()));
#[cfg(feature = "trie-debug")]
let config = config.with_proof_jitter(self.proof_jitter);
@@ -567,7 +560,6 @@ mod tests {
sparse_trie_max_hot_accounts: 500,
slow_block_threshold: None,
disable_sparse_trie_cache_pruning: true,
enable_arena_sparse_trie: true,
state_root_task_timeout: Some(Duration::from_secs(2)),
#[cfg(feature = "trie-debug")]
proof_jitter: None,
@@ -607,7 +599,6 @@ mod tests {
"--engine.sparse-trie-max-hot-accounts",
"500",
"--engine.disable-sparse-trie-cache-pruning",
"--engine.enable-arena-sparse-trie",
"--engine.state-root-task-timeout",
"2s",
])

View File

@@ -4,20 +4,26 @@ use clap::{ArgAction, Args};
/// Parameters for storage configuration.
///
/// V2 storage is now the default for all new databases. The `--storage.v2` flag is
/// accepted for backwards compatibility but has no effect — v2 is always used.
///
/// Existing databases always use the settings persisted in their metadata.
/// This controls whether the node uses v2 storage defaults (with `RocksDB` and static file
/// optimizations) or v1/legacy storage defaults.
///
/// Individual storage settings can be overridden with `--static-files.*` and `--rocksdb.*` flags.
#[derive(Debug, Args, PartialEq, Eq, Clone, Copy, Default)]
#[command(next_help_heading = "Storage")]
pub struct StorageArgs {
/// Deprecated no-op: v2 storage is now always enabled for new databases.
/// Enable v2 storage defaults (static files + `RocksDB` routing).
///
/// Kept for backwards compatibility with existing scripts and configurations.
/// Existing databases always use the settings persisted in their metadata.
#[arg(long = "storage.v2", action = ArgAction::SetTrue, hide = true)]
/// When enabled, the node uses optimized storage settings:
/// - Receipts and transaction senders in static files
/// - History indices in `RocksDB` (accounts, storages, transaction hashes)
/// - Account and storage changesets in static files
///
/// This is a genesis-initialization-only setting: changing it after genesis requires a
/// re-sync.
///
/// Individual settings can still be overridden with `--static-files.*` and `--rocksdb.*`
/// flags.
#[arg(long = "storage.v2", action = ArgAction::SetTrue)]
pub v2: bool,
}
@@ -35,13 +41,14 @@ mod tests {
#[test]
fn test_default_storage_args() {
let default_args = StorageArgs::default();
let args = CommandParser::<StorageArgs>::parse_from(["reth"]).args;
assert_eq!(args, StorageArgs::default());
assert_eq!(args, default_args);
assert!(!args.v2);
}
#[test]
fn test_parse_v2_flag_accepted() {
// Flag is accepted for backwards compatibility but is a no-op
fn test_parse_v2_flag() {
let args = CommandParser::<StorageArgs>::parse_from(["reth", "--storage.v2"]).args;
assert!(args.v2);
}

View File

@@ -13,21 +13,27 @@ pub struct NodeExitFuture {
/// The consensus engine future.
/// This can be polled to wait for the consensus engine to exit.
consensus_engine_fut: Option<BoxFuture<'static, eyre::Result<()>>>,
/// Flag indicating whether the node should be terminated after the pipeline sync.
terminate: bool,
}
impl fmt::Debug for NodeExitFuture {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NodeExitFuture").field("consensus_engine_fut", &"...").finish()
f.debug_struct("NodeExitFuture")
.field("consensus_engine_fut", &"...")
.field("terminate", &self.terminate)
.finish()
}
}
impl NodeExitFuture {
/// Create a new `NodeExitFuture`.
pub fn new<F>(consensus_engine_fut: F) -> Self
pub fn new<F>(consensus_engine_fut: F, terminate: bool) -> Self
where
F: Future<Output = eyre::Result<()>> + 'static + Send,
{
Self { consensus_engine_fut: Some(Box::pin(consensus_engine_fut)) }
Self { consensus_engine_fut: Some(Box::pin(consensus_engine_fut)), terminate }
}
}
@@ -40,7 +46,11 @@ impl Future for NodeExitFuture {
match ready!(rx.poll_unpin(cx)) {
Ok(_) => {
this.consensus_engine_fut.take();
Poll::Ready(Ok(()))
if this.terminate {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
Err(err) => Poll::Ready(Err(err)),
}
@@ -53,15 +63,28 @@ impl Future for NodeExitFuture {
#[cfg(test)]
mod tests {
use super::*;
use std::future::poll_fn;
#[tokio::test]
async fn test_node_exit_future() {
async fn test_node_exit_future_terminate_true() {
let fut = async { Ok(()) };
let node_exit_future = NodeExitFuture::new(fut);
let node_exit_future = NodeExitFuture::new(fut, true);
let res = node_exit_future.await;
assert!(res.is_ok());
}
#[tokio::test]
async fn test_node_exit_future_terminate_false() {
let fut = async { Ok(()) };
let mut node_exit_future = NodeExitFuture::new(fut, false);
poll_fn(|cx| {
assert!(node_exit_future.poll_unpin(cx).is_pending());
Poll::Ready(())
})
.await;
}
}

View File

@@ -371,16 +371,17 @@ impl<ChainSpec> NodeConfig<ChainSpec> {
self.pruning.prune_config(&self.chain)
}
/// Returns the effective storage settings for this node.
/// Returns the effective storage settings derived from `--storage.v2`.
///
/// Always returns [`StorageSettings::v2()`] — v2 storage is the default for
/// new nodes. Existing nodes retain whatever settings are persisted in their
/// database metadata (checked during genesis init).
///
/// Existing databases retain whatever settings are persisted in their
/// metadata (checked during genesis init).
/// The base storage mode is determined by `--storage.v2`:
/// - When `--storage.v2` is set: uses [`StorageSettings::v2()`] defaults
/// - Otherwise: uses [`StorageSettings::base()`] defaults
pub const fn storage_settings(&self) -> StorageSettings {
StorageSettings::v2()
if self.storage.v2 {
StorageSettings::v2()
} else {
StorageSettings::base()
}
}
/// Returns the max block that the node should run to, looking it up from the network if

View File

@@ -612,11 +612,8 @@ where
tokio::spawn(async move {
loop {
let conn_clone = {
let conn_guard = conn_arc.read().await;
conn_guard.as_ref().cloned()
};
if let Some(conn) = conn_clone {
let conn_guard = conn_arc.read().await;
if let Some(conn) = conn_guard.as_ref() {
match conn.read_json().await {
Ok(msg) => {
if message_tx.send(msg).await.is_err() {
@@ -629,6 +626,7 @@ where
}
other => {
debug!(target: "ethstats", "Read error: {}", other);
drop(conn_guard);
if let Some(conn) = conn_arc.write().await.take() {
let _ = conn.close().await;
}

View File

@@ -1,7 +1,7 @@
//! Payload builder service metrics.
use reth_metrics::{
metrics::{Counter, Gauge, Histogram},
metrics::{Counter, Gauge},
Metrics,
};
@@ -23,10 +23,6 @@ pub(crate) struct PayloadBuilderServiceMetrics {
pub(crate) resolved_revenue: Gauge,
/// Current block returned as the resolved payload
pub(crate) resolved_block: Gauge,
/// Histogram of payload resolve latency in seconds
pub(crate) resolve_duration_seconds: Histogram,
/// Histogram of new payload job creation latency in seconds
pub(crate) new_job_duration_seconds: Histogram,
}
impl PayloadBuilderServiceMetrics {

View File

@@ -14,7 +14,7 @@ use futures_util::{future::FutureExt, Stream, StreamExt};
use reth_chain_state::CanonStateNotification;
use reth_payload_builder_primitives::{Events, PayloadBuilderError, PayloadEvents};
use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes, PayloadKind, PayloadTypes};
use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
use reth_primitives_traits::NodePrimitives;
use std::{
fmt,
future::Future,
@@ -84,7 +84,12 @@ where
) -> Option<Result<u64, PayloadBuilderError>> {
self.inner.payload_timestamp(id).await
}
}
impl<T> PayloadStore<T>
where
T: PayloadTypes,
{
/// Create a new instance
pub fn new(inner: PayloadBuilderHandle<T>) -> Self {
Self { inner: Arc::new(inner) }
@@ -301,13 +306,11 @@ where
id: PayloadId,
kind: PayloadKind,
) -> Option<PayloadFuture<T::BuiltPayload>> {
let start = Instant::now();
debug!(target: "payload_builder", %id, "resolving payload job");
if let Some((cached, _, payload)) = &*self.cached_payload_rx.borrow() &&
*cached == id
{
self.metrics.resolve_duration_seconds.record(start.elapsed());
return Some(Box::pin(core::future::ready(Ok(payload.clone()))));
}
@@ -328,7 +331,6 @@ where
let fut = async move {
let res = fut.await;
resolved_metrics.resolve_duration_seconds.record(start.elapsed());
if let Ok(payload) = &res {
if payload_events.receiver_count() > 0 {
payload_events.send(Events::BuiltPayload(payload.clone().into())).ok();
@@ -346,7 +348,15 @@ where
Some(Box::pin(fut))
}
}
impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
where
T: PayloadTypes,
Gen: PayloadJobGenerator,
Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
<Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
{
/// Returns the payload timestamp for the given payload.
fn payload_timestamp(&self, id: PayloadId) -> Option<Result<u64, PayloadBuilderError>> {
if let Some((cached_id, timestamp, _)) = *self.cached_payload_rx.borrow() &&
@@ -430,7 +440,6 @@ where
debug!(target: "payload_builder",%id, parent = %attr.parent(), "Payload job already in progress, ignoring.");
} else {
let parent = attr.parent();
let start = Instant::now();
let job_result = {
let _entered = job_span.enter();
this.generator.new_payload_job(attr.clone())
@@ -438,7 +447,6 @@ where
match job_result {
Ok(job) => {
this.metrics.new_job_duration_seconds.record(start.elapsed());
info!(target: "payload_builder", %id, %parent, "New payload job created");
this.metrics.inc_initiated_jobs();
new_job = true;
@@ -448,18 +456,15 @@ where
// Clear stale cached payload for this id so
// resolve() never returns an outdated result
// from a previous job with the same id.
if this
.cached_payload_rx
.borrow()
.as_ref()
.is_some_and(|(cached_id, _, _)| *cached_id == id)
if let Some((cached_id, _, _)) =
&*this.cached_payload_rx.borrow() &&
*cached_id == id
{
trace!(target: "payload_builder", %id, "clearing stale cached payload for reused payload id");
let _ = this.cached_payload_tx.send(None);
}
}
Err(err) => {
this.metrics.new_job_duration_seconds.record(start.elapsed());
this.metrics.inc_failed_jobs();
warn!(target: "payload_builder", %err, %id, "Failed to create payload builder job");
res = Err(err);

View File

@@ -139,7 +139,9 @@ impl NewPayloadError {
pub fn other(err: impl error::Error + Send + Sync + 'static) -> Self {
Self::Other(Box::new(err))
}
}
impl NewPayloadError {
/// Returns `true` if the error is caused by a block hash mismatch.
#[inline]
pub const fn is_block_hash_mismatch(&self) -> bool {

View File

@@ -42,10 +42,13 @@ rayon.workspace = true
tokio.workspace = true
rustc-hash.workspace = true
[features]
rocksdb = ["reth-provider/rocksdb"]
[dev-dependencies]
# reth
reth-db = { workspace = true, features = ["test-utils"] }
reth-stages = { workspace = true, features = ["test-utils"] }
reth-stages = { workspace = true, features = ["test-utils", "rocksdb"] }
reth-primitives-traits = { workspace = true, features = ["arbitrary"] }
reth-testing-utils.workspace = true
reth-tracing.workspace = true

View File

@@ -74,6 +74,7 @@ where
let range_end = *range.end();
// Check where account history indices are stored
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storage_v2 {
return self.prune_rocksdb(provider, input, range, range_end);
}
@@ -231,6 +232,7 @@ impl AccountHistory {
///
/// Reads account changesets from static files and prunes the corresponding
/// `RocksDB` history shards.
#[cfg(all(unix, feature = "rocksdb"))]
fn prune_rocksdb<Provider>(
&self,
provider: &Provider,
@@ -504,6 +506,157 @@ mod tests {
test_prune(1400, 3, (PruneProgress::Finished, 804));
}
/// Tests the `prune_static_files` code path. On unix with rocksdb feature, v2 storage
/// routes to `prune_rocksdb` instead, so this test only runs without rocksdb (the
/// `prune_rocksdb_path` test covers that configuration).
#[test]
#[cfg(not(all(unix, feature = "rocksdb")))]
fn prune_static_file() {
let db = TestStageDB::default();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=5000,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
let (changesets, _) = random_changeset_range(
&mut rng,
blocks.iter(),
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
0..0,
0..0,
);
db.insert_changesets_to_static_files(changesets.clone(), None)
.expect("insert changesets to static files");
db.insert_history(changesets.clone(), None).expect("insert history");
let account_occurrences = db.table::<tables::AccountsHistory>().unwrap().into_iter().fold(
BTreeMap::<_, usize>::new(),
|mut map, (key, _)| {
map.entry(key.key).or_default().add_assign(1);
map
},
);
assert!(account_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
let original_shards = db.table::<tables::AccountsHistory>().unwrap();
let test_prune =
|to_block: BlockNumber, run: usize, expected_result: (PruneProgress, usize)| {
let prune_mode = PruneMode::Before(to_block);
let deleted_entries_limit = 2000;
let mut limiter =
PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
let input = PruneInput {
previous_checkpoint: db
.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::AccountHistory)
.unwrap(),
to_block,
limiter: limiter.clone(),
};
let segment = AccountHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(StorageSettings::v2());
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
assert_matches!(
result,
SegmentOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);
segment
.save_checkpoint(
&provider,
result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
)
.unwrap();
provider.commit().expect("commit");
let changesets = changesets
.iter()
.enumerate()
.flat_map(|(block_number, changeset)| {
changeset.iter().map(move |change| (block_number, change))
})
.collect::<Vec<_>>();
#[expect(clippy::skip_while_next)]
let pruned = changesets
.iter()
.enumerate()
.skip_while(|(i, (block_number, _))| {
*i < deleted_entries_limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE * run &&
*block_number <= to_block as usize
})
.next()
.map(|(i, _)| i)
.unwrap_or_default();
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
let last_pruned_block_number = pruned_changesets
.next()
.map(|(block_number, _)| {
(if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
}) as BlockNumber
})
.unwrap_or(to_block);
let actual_shards = db.table::<tables::AccountsHistory>().unwrap();
let expected_shards = original_shards
.iter()
.filter(|(key, _)| key.highest_block_number > last_pruned_block_number)
.map(|(key, blocks)| {
let new_blocks =
blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
})
.collect::<Vec<_>>();
assert_eq!(actual_shards, expected_shards);
assert_eq!(
db.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::AccountHistory)
.unwrap(),
Some(PruneCheckpoint {
block_number: Some(last_pruned_block_number),
tx_number: None,
prune_mode
})
);
};
test_prune(
998,
1,
(PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 1000),
);
test_prune(998, 2, (PruneProgress::Finished, 1000));
test_prune(1400, 3, (PruneProgress::Finished, 804));
}
#[cfg(all(unix, feature = "rocksdb"))]
#[test]
fn prune_rocksdb_path() {
use reth_db_api::models::ShardedKey;

View File

@@ -75,6 +75,7 @@ where
let range_end = *range.end();
// Check where storage history indices are stored
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storage_v2 {
return self.prune_rocksdb(provider, input, range, range_end);
}
@@ -235,6 +236,7 @@ impl StorageHistory {
///
/// Reads storage changesets from static files and prunes the corresponding
/// `RocksDB` history shards.
#[cfg(all(unix, feature = "rocksdb"))]
fn prune_rocksdb<Provider>(
&self,
provider: &Provider,
@@ -516,6 +518,159 @@ mod tests {
test_prune(1200, 3, (PruneProgress::Finished, 202));
}
/// Tests the `prune_static_files` code path. On unix with rocksdb feature, v2 storage
/// routes to `prune_rocksdb` instead, so this test only runs without rocksdb (the
/// `prune_rocksdb_path` test covers that configuration).
#[test]
#[cfg(not(all(unix, feature = "rocksdb")))]
fn prune_static_file() {
let db = TestStageDB::default();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=5000,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
let (changesets, _) = random_changeset_range(
&mut rng,
blocks.iter(),
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
1..2,
1..2,
);
db.insert_changesets_to_static_files(changesets.clone(), None)
.expect("insert changesets to static files");
db.insert_history(changesets.clone(), None).expect("insert history");
let storage_occurrences = db.table::<tables::StoragesHistory>().unwrap().into_iter().fold(
BTreeMap::<_, usize>::new(),
|mut map, (key, _)| {
map.entry((key.address, key.sharded_key.key)).or_default().add_assign(1);
map
},
);
assert!(storage_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
let original_shards = db.table::<tables::StoragesHistory>().unwrap();
let test_prune = |to_block: BlockNumber,
run: usize,
expected_result: (PruneProgress, usize)| {
let prune_mode = PruneMode::Before(to_block);
let deleted_entries_limit = 1000;
let mut limiter =
PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
let input = PruneInput {
previous_checkpoint: db
.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::StorageHistory)
.unwrap(),
to_block,
limiter: limiter.clone(),
};
let segment = StorageHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(StorageSettings::v2());
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
assert_matches!(
result,
SegmentOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);
segment
.save_checkpoint(
&provider,
result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
)
.unwrap();
provider.commit().expect("commit");
let changesets = changesets
.iter()
.enumerate()
.flat_map(|(block_number, changeset)| {
changeset.iter().flat_map(move |(address, _, entries)| {
entries.iter().map(move |entry| (block_number, address, entry))
})
})
.collect::<Vec<_>>();
#[expect(clippy::skip_while_next)]
let pruned = changesets
.iter()
.enumerate()
.skip_while(|(i, (block_number, _, _))| {
*i < deleted_entries_limit / STORAGE_HISTORY_TABLES_TO_PRUNE * run &&
*block_number <= to_block as usize
})
.next()
.map(|(i, _)| i)
.unwrap_or_default();
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
let last_pruned_block_number = pruned_changesets
.next()
.map(|(block_number, _, _)| {
(if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
}) as BlockNumber
})
.unwrap_or(to_block);
let actual_shards = db.table::<tables::StoragesHistory>().unwrap();
let expected_shards = original_shards
.iter()
.filter(|(key, _)| key.sharded_key.highest_block_number > last_pruned_block_number)
.map(|(key, blocks)| {
let new_blocks =
blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
})
.collect::<Vec<_>>();
assert_eq!(actual_shards, expected_shards);
assert_eq!(
db.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::StorageHistory)
.unwrap(),
Some(PruneCheckpoint {
block_number: Some(last_pruned_block_number),
tx_number: None,
prune_mode
})
);
};
test_prune(
998,
1,
(PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 500),
);
test_prune(998, 2, (PruneProgress::Finished, 500));
test_prune(1200, 3, (PruneProgress::Finished, 202));
}
/// Tests that when a limiter stops mid-block (with multiple storage changes for the same
/// block), the checkpoint is set to `block_number - 1` to avoid dangling index entries.
#[test]
@@ -666,6 +821,7 @@ mod tests {
assert!(final_changesets.is_empty(), "All changesets up to block 10 should be pruned");
}
#[cfg(all(unix, feature = "rocksdb"))]
#[test]
fn prune_rocksdb() {
use reth_db_api::models::storage_sharded_key::StorageShardedKey;

View File

@@ -95,6 +95,7 @@ where
.into_inner();
// Check where transaction hash numbers are stored
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storage_v2 {
return self.prune_rocksdb(provider, input, start, end);
}
@@ -195,6 +196,7 @@ impl TransactionLookup {
///
/// Reads transactions from static files and deletes corresponding entries
/// from the `RocksDB` `TransactionHashNumbers` table.
#[cfg(all(unix, feature = "rocksdb"))]
fn prune_rocksdb<Provider>(
&self,
provider: &Provider,
@@ -436,6 +438,7 @@ mod tests {
test_prune(10, (PruneProgress::Finished, 8));
}
#[cfg(all(unix, feature = "rocksdb"))]
#[test]
fn prune_rocksdb() {
use reth_db_api::models::StorageSettings;
@@ -536,6 +539,7 @@ mod tests {
/// 1. Some transactions have already been pruned (checkpoint at tx 5)
/// 2. The deleted entries limit is exhausted before any new deletions
/// 3. The checkpoint should NOT advance to the next start position
#[cfg(all(unix, feature = "rocksdb"))]
#[test]
fn prune_rocksdb_zero_deleted_checkpoint() {
use reth_db_api::models::StorageSettings;

View File

@@ -73,7 +73,7 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
) -> impl Future<Output = SimulatedBlocksResult<Self::NetworkTypes, Self::Error>> + Send {
async move {
if payload.block_state_calls.len() > self.max_simulate_blocks() as usize {
return Err(EthApiError::other(EthSimulateError::TooManyBlocks).into())
return Err(EthApiError::InvalidParams("too many blocks.".to_string()).into())
}
let block = block.unwrap_or_default();
@@ -491,15 +491,6 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
// Disabled because eth_createAccessList is sometimes used with non-eoa senders
evm_env.cfg_env.disable_eip3607 = true;
// Disable additional fee charges (e.g. L2 operator fees),
// consistent with prepare_call_env and estimate_gas_with.
evm_env.cfg_env.disable_fee_charge = true;
// Disable EIP-7825 transaction gas limit cap so that the gas limit
// fallback (block gas limit) is not rejected when it exceeds the
// per-tx cap (2^24 ≈ 16.7M post-Osaka).
evm_env.cfg_env.tx_gas_limit_cap = Some(u64::MAX);
if request.as_ref().gas_limit().is_none() && tx_env.gas_price() > 0 {
let cap = this.caller_gas_allowance(&mut db, &evm_env, &tx_env)?;
// no gas limit was provided in the request, so we need to cap the request's gas

View File

@@ -104,16 +104,11 @@ where
fork_timestamps.sort_unstable();
fork_timestamps.dedup();
let current_fork_idx = match fork_timestamps.iter().position(|ts| &latest.timestamp() < ts)
{
// All forks are in the past, use the last one.
None => fork_timestamps.len().checked_sub(1),
// First fork hasn't activated yet — no active timestamp fork.
Some(0) => None,
// Found a future fork; current is the one right before it.
Some(idx) => Some(idx - 1),
};
let (current_fork_idx, current_fork_timestamp) = current_fork_idx
let (current_fork_idx, current_fork_timestamp) = fork_timestamps
.iter()
.position(|ts| &latest.timestamp() < ts)
.and_then(|idx| idx.checked_sub(1))
.or_else(|| fork_timestamps.len().checked_sub(1))
.and_then(|idx| fork_timestamps.get(idx).map(|ts| (idx, *ts)))
.ok_or_else(|| RethError::msg("no active timestamp fork found"))?;

View File

@@ -42,7 +42,6 @@ pub trait EstimateCall: Call {
///
/// - `disable_eip3607` is set to `true`
/// - `disable_base_fee` is set to `true`
/// - `disable_fee_charge` is set to `true`
/// - `nonce` is set to `None`
fn estimate_gas_with<S>(
&self,
@@ -63,10 +62,6 @@ pub trait EstimateCall: Call {
// <https://github.com/ethereum/go-ethereum/blob/ee8e83fa5f6cb261dad2ed0a7bbcde4930c41e6c/internal/ethapi/api.go#L985>
evm_env.cfg_env.disable_base_fee = true;
// Disable additional fee charges (e.g. L2 operator fees) for gas estimation,
// consistent with `prepare_call_env` for `eth_call`.
evm_env.cfg_env.disable_fee_charge = true;
// set nonce to None so that the correct nonce is chosen by the EVM
request.as_mut().take_nonce();

View File

@@ -47,9 +47,6 @@ pub enum EthSimulateError {
/// Total gas limit of transactions for the block exceeds the block gas limit.
#[error("Block gas limit exceeded by the block's transactions")]
BlockGasLimitExceeded,
/// Number of simulated blocks exceeds the configured client limit.
#[error("too many blocks")]
TooManyBlocks,
/// Max gas limit for entire operation exceeded.
#[error("Client adjustable limit reached")]
GasLimitReached,
@@ -119,7 +116,7 @@ impl EthSimulateError {
Self::BlockTimestampInvalid { .. } => -38021,
Self::SenderNotEOA => -38024,
Self::MaxInitCodeSizeExceeded => -38025,
Self::TooManyBlocks | Self::GasLimitReached => -38026,
Self::GasLimitReached => -38026,
Self::NotAPrecompile(_) => -32000,
}
}

View File

@@ -295,7 +295,6 @@ where
fn extract_reward_traces<H: BlockHeader>(
&self,
header: &H,
block_hash: BlockHash,
ommers: Option<&[H]>,
base_block_reward: u128,
) -> Vec<LocalizedTransactionTrace> {
@@ -304,7 +303,6 @@ where
let block_reward = block_reward(base_block_reward, ommers_cnt);
traces.push(reward_trace(
block_hash,
header,
RewardAction {
author: header.beneficiary(),
@@ -318,7 +316,6 @@ where
for uncle in ommers {
let uncle_reward = ommer_reward(base_block_reward, header.number(), uncle.number());
traces.push(reward_trace(
block_hash,
header,
RewardAction {
author: uncle.beneficiary(),
@@ -431,7 +428,6 @@ where
all_traces.extend(
self.extract_reward_traces(
block.header(),
block.hash(),
block.body().ommers(),
base_block_reward,
)
@@ -506,7 +502,6 @@ where
{
traces.extend(self.extract_reward_traces(
block.header(),
block.hash(),
block.body().ommers(),
base_block_reward,
));
@@ -800,13 +795,9 @@ pub struct BlockStorageAccess {
/// Helper to construct a [`LocalizedTransactionTrace`] that describes a reward to the block
/// beneficiary.
fn reward_trace<H: BlockHeader>(
block_hash: BlockHash,
header: &H,
reward: RewardAction,
) -> LocalizedTransactionTrace {
fn reward_trace<H: BlockHeader>(header: &H, reward: RewardAction) -> LocalizedTransactionTrace {
LocalizedTransactionTrace {
block_hash: Some(block_hash),
block_hash: Some(header.hash_slow()),
block_number: Some(header.number()),
transaction_hash: None,
transaction_position: None,

View File

@@ -378,7 +378,7 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
});
}
// update finalized and safe block if needed
// update finalized block if needed
let last_saved_finalized_block_number =
provider_rw.last_finalized_block_number()?;
@@ -392,16 +392,6 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
))?;
}
let last_saved_safe_block_number = provider_rw.last_safe_block_number()?;
if last_saved_safe_block_number.is_none() ||
Some(checkpoint.block_number) < last_saved_safe_block_number
{
provider_rw.save_safe_block_number(BlockNumber::from(
checkpoint.block_number,
))?;
}
provider_rw.commit()?;
stage.post_unwind_commit()?;

View File

@@ -121,3 +121,5 @@ test-utils = [
"reth-evm-ethereum/test-utils",
"reth-tasks/test-utils",
]
rocksdb = ["reth-provider/rocksdb", "reth-db-common/rocksdb"]
edge = ["rocksdb"]

View File

@@ -1,7 +1,9 @@
use super::collect_account_history_indices;
use crate::stages::utils::{collect_history_indices, load_account_history};
use reth_config::config::{EtlConfig, IndexHistoryConfig};
use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut, Tables};
#[cfg(all(unix, feature = "rocksdb"))]
use reth_db_api::Tables;
use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut};
use reth_provider::{
DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
RocksDBProviderFactory, StorageSettingsCache,
@@ -142,6 +144,7 @@ where
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
#[cfg(all(unix, feature = "rocksdb"))]
if use_rocksdb {
provider.commit_pending_rocksdb_batches()?;
provider.rocksdb_provider().flush(&[Tables::AccountsHistory.name()])?;
@@ -660,6 +663,7 @@ mod tests {
}
}
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb_tests {
use super::*;
use reth_provider::{

View File

@@ -1,11 +1,12 @@
use super::{collect_history_indices, collect_storage_history_indices};
use crate::{stages::utils::load_storage_history, StageCheckpoint, StageId};
use reth_config::config::{EtlConfig, IndexHistoryConfig};
#[cfg(all(unix, feature = "rocksdb"))]
use reth_db_api::Tables;
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, AddressStorageKey, BlockNumberAddress},
tables,
transaction::DbTxMut,
Tables,
};
use reth_provider::{
DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
@@ -147,6 +148,7 @@ where
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
#[cfg(all(unix, feature = "rocksdb"))]
if use_rocksdb {
provider.commit_pending_rocksdb_batches()?;
provider.rocksdb_provider().flush(&[Tables::StoragesHistory.name()])?;
@@ -689,6 +691,7 @@ mod tests {
}
}
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb_tests {
use super::*;
use reth_db_api::models::StorageBeforeTx;

View File

@@ -2,11 +2,12 @@ use alloy_eips::eip2718::Encodable2718;
use alloy_primitives::{TxHash, TxNumber};
use num_traits::Zero;
use reth_config::config::{EtlConfig, TransactionLookupConfig};
#[cfg(all(unix, feature = "rocksdb"))]
use reth_db_api::Tables;
use reth_db_api::{
table::{Decode, Decompress, Value},
tables,
transaction::DbTxMut,
Tables,
};
use reth_etl::Collector;
use reth_primitives_traits::{NodePrimitives, SignedTransaction};
@@ -198,6 +199,7 @@ where
}
}
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storage_v2 {
provider.commit_pending_rocksdb_batches()?;
provider.rocksdb_provider().flush(&[Tables::TransactionHashNumbers.name()])?;
@@ -599,6 +601,7 @@ mod tests {
}
}
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb_tests {
use super::*;
use reth_provider::RocksDBProviderFactory;

View File

@@ -36,3 +36,6 @@ reth-testing-utils.workspace = true
assert_matches.workspace = true
tempfile.workspace = true
[features]
edge = ["reth-stages/edge"]

View File

@@ -110,7 +110,7 @@ impl Compact for AlloyHeader {
}
fn from_compact(buf: &[u8], len: usize) -> (Self, &[u8]) {
let (header, buf) = Header::from_compact(buf, len);
let (header, _) = Header::from_compact(buf, len);
let alloy_header = Self {
parent_hash: header.parent_hash,
ommers_hash: header.ommers_hash,

View File

@@ -93,3 +93,5 @@ op = [
"reth-codecs/op",
"reth-primitives-traits/op",
]
rocksdb = []
edge = ["rocksdb"]

View File

@@ -28,8 +28,20 @@ pub struct StorageSettings {
impl StorageSettings {
/// Returns the default base `StorageSettings`.
///
/// When the `edge` feature is enabled, returns [`Self::v2()`] so that CI and
/// edge builds automatically use v2 storage defaults. Otherwise returns
/// [`Self::v1()`]. The `--storage.v2` CLI flag can also opt into v2 at runtime
/// regardless of feature flags.
pub const fn base() -> Self {
Self::v2()
#[cfg(feature = "edge")]
{
Self::v2()
}
#[cfg(not(feature = "edge"))]
{
Self::v1()
}
}
/// Creates `StorageSettings` for v2 nodes with all storage features enabled:

Some files were not shown because too many files have changed in this diff Show More