Compare commits

...

32 Commits

Author SHA1 Message Date
rakita
d3771d341a add reth-core t4 patch (6b12498) 2026-04-16 10:58:25 +02:00
rakita
9936061e5f bump revm, revm-inspectors, and alloy-evm to t4 branches
revm: 89ecb25dbe49e1c3a10d99529e42f027d0bd2386
revm-inspectors: c6f88bbe7186d863f4667dd43c42608eb7a8ba5c
alloy-evm: ff0bbec9ccaa818155e25003a77f4d73d350bbd7
2026-04-16 10:26:28 +02:00
Alexey Shekhirin
199b7460a9 refactor: decouple CachedStateMetrics from SavedCache (#23552) 2026-04-15 21:30:15 +00:00
Derek Cofausper
41592ef1f8 fix(download): respect --datadir.static-files during extraction (#23445)
Co-authored-by: Matthias Seitz <19890894+mattsse@users.noreply.github.com>
Co-authored-by: Emma Jamieson-Hoare <21029500+emmajam@users.noreply.github.com>
Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com>
2026-04-15 21:19:42 +00:00
Arsenii Kulikov
bdbb8df17e fix: validate against executor output gas used (#23569) 2026-04-15 20:37:14 +00:00
Dan Cline
f451ad5380 feat(cli): add reth download config options (#23513) 2026-04-15 20:23:08 +00:00
AJStonewee
6e4009eed4 fix(rpc): prevent panic in log subscription on broadcast lag (#23561) 2026-04-15 19:23:33 +00:00
Brian Picciano
cf29b3fffe perf(engine): include backpressure in newPayload latency metric (#23541)
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-04-15 17:14:35 +00:00
Matthias Seitz
7fe76a83d1 fix(net): encode block access lists as raw BAL RLP (#23536)
Co-authored-by: Arsenii Kulikov <klkvrr@gmail.com>
2026-04-15 12:42:16 +00:00
Ishika Choudhury
b1cff500ad chore(BAL): remove debug_get_block_access_list (#23534) 2026-04-15 12:33:37 +00:00
figtracer
0b33057414 fix(init-state): write accounts directly with chunked commits (#23469) 2026-04-15 10:52:53 +00:00
Soubhik Singha Mahapatra
3891092ee9 chore: add amsterdam time to chainspec (#23526) 2026-04-15 10:29:14 +00:00
Emma Jamieson-Hoare
8784aa45fc chore: bump revm to v37 (EIP-8037 state gas) (#23191)
Co-authored-by: Federico Gimenez <federico.gimenez@gmail.com>
Co-authored-by: Federico Gimenez <fgimenez@users.noreply.github.com>
Co-authored-by: klkvr <klkvrr@gmail.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-04-15 10:08:12 +00:00
Brian Picciano
f1d90612e3 feat(ci): add slack=on-win mode to bench workflows (#23522)
Co-authored-by: Amp <amp@ampcode.com>
2026-04-15 09:45:37 +00:00
Derek Cofausper
03d69f59a5 chore(ci): add @ai investigate to bench failure alerts (#23520)
Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com>
2026-04-15 09:19:14 +00:00
Ishika Choudhury
d372c8f5a9 chore(BAL): added gas limit fn to ExecutionPayload (#23518) 2026-04-15 09:01:35 +00:00
Arsenii Kulikov
dbb8495be1 fix: allow adding peers without overriding kind (#23516) 2026-04-14 21:00:39 +00:00
Ishika Choudhury
044db3ec95 feat: implement try into v6 (#23497)
Co-authored-by: Soubhik Singha Mahapatra <soubhiksmp2004@gmail.com>
Co-authored-by: Soubhik Singha Mahapatra <160333583+Soubhik-10@users.noreply.github.com>
2026-04-14 20:04:21 +00:00
Matthias Seitz
13217d5517 feat(discv4): add AddBootNode command (#23515) 2026-04-14 19:32:38 +00:00
Matthias Seitz
0165569bc1 feat(net): add discv4/discv5 getters to NetworkHandle (#23514) 2026-04-14 19:25:54 +00:00
Brian Picciano
84c14fe0a8 ci(bench): replace no_slack boolean with slack dropdown (always/on-error/never) (#23501)
Co-authored-by: Amp <amp@ampcode.com>
2026-04-14 15:57:20 +00:00
Tim
5b4af55017 feat: add reqwest-rustls to support otlp endpoints with https (#23495) 2026-04-14 14:09:09 +00:00
Dan Cline
b8ab2c628e chore(cli): add binary name and chain detection in tempo download log (#23356)
Co-authored-by: Derek Cofausper <256792747+decofe@users.noreply.github.com>
Co-authored-by: Emma Jamieson-Hoare <21029500+emmajam@users.noreply.github.com>
2026-04-14 13:23:29 +00:00
Brian Picciano
766f4317a6 chore(bench): reduce default blocks to 200, warmup to 20 for big-blocks (#23494)
Co-authored-by: mediocregopher <mediocregopher@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-04-14 13:13:21 +00:00
Derek Cofausper
c20d897efe fix(node): downgrade prune config log from warn to info (#23493)
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
2026-04-14 11:43:43 +00:00
Brian Picciano
ad1e8f2cea feat(bench): BAL capture, replay, and inline payload decoding (#23434)
Co-authored-by: Soubhik Singha Mahapatra <soubhiksmp2004@gmail.com>
Co-authored-by: Soubhik Singha Mahapatra <160333583+Soubhik-10@users.noreply.github.com>
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: steven <corderosteven6@gmail.com>
Co-authored-by: Derek Cofausper <256792747+decofe@users.noreply.github.com>
2026-04-14 11:23:13 +00:00
Derek Cofausper
51309ff55c fix(bench): retry on all transport errors (#23491)
Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com>
Co-authored-by: Alexey Shekhirin <github@shekhirin.com>
2026-04-14 10:42:05 +00:00
Soubhik Singha Mahapatra
e0aac5015f chore(BAL): added newPayloadV5 and getPayloadV6 (#23486)
Co-authored-by: Ishika Choudhury <117741714+Rimeeeeee@users.noreply.github.com>
2026-04-14 08:57:42 +00:00
Ishika Choudhury
3b8290439a chore(BAL): added helper functions for building (#23490) 2026-04-14 08:49:49 +00:00
yottaes
1a2836ff53 feat(rpc): support transactionReceipts subscription in eth_subscribe (#23485)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-04-14 08:42:01 +00:00
MagicJoshh
bce7368a82 fix(engine): use IndexSet for deterministic block buffer child ordering (#22676) 2026-04-13 19:18:23 +00:00
MagicJoshh
1e461ef281 fix(trie): terminate depth-first iterator on database error (#22709) 2026-04-13 17:48:07 +00:00
83 changed files with 2663 additions and 795 deletions

View File

@@ -7,6 +7,7 @@
#
# Required env: SCHELK_MOUNT, BENCH_RPC_URL, BENCH_BLOCKS, BENCH_WARMUP_BLOCKS
# Optional env: BENCH_BIG_BLOCKS (true/false), BENCH_WORK_DIR (for big blocks path)
# BENCH_BAL (false/true/feature/baseline; only used with big blocks)
# BENCH_WAIT_TIME (duration like 500ms, default empty)
# BENCH_BASELINE_ARGS (extra reth node args for baseline runs)
# BENCH_FEATURE_ARGS (extra reth node args for feature runs)
@@ -249,11 +250,33 @@ fi
if [ "$BIG_BLOCKS" = "true" ]; then
# Big blocks mode: replay pre-generated payloads
BIG_BLOCKS_DIR="${BENCH_BIG_BLOCKS_DIR:-${BENCH_WORK_DIR}/big-blocks}"
BENCH_BAL_MODE="${BENCH_BAL:-false}"
BB_BENCH_ARGS=(--reth-new-payload)
if [ -n "${BENCH_WAIT_TIME:-}" ]; then
BB_BENCH_ARGS+=(--wait-time "$BENCH_WAIT_TIME")
fi
case "$BENCH_BAL_MODE" in
false)
;;
true)
BB_BENCH_ARGS+=(--bal)
;;
baseline)
if [[ "$LABEL" == baseline* ]]; then
BB_BENCH_ARGS+=(--bal)
fi
;;
feature)
if [[ "$LABEL" == feature* ]]; then
BB_BENCH_ARGS+=(--bal)
fi
;;
*)
echo "::error::Unknown BENCH_BAL value: $BENCH_BAL_MODE"
exit 1
;;
esac
# Warmup
WARMUP="${BENCH_WARMUP_BLOCKS:-50}"

View File

@@ -111,6 +111,14 @@ def compute_stats(combined: list[dict]) -> dict:
wall_clock_s = sum(total_latencies_ms) / 1_000
mean_total_lat_ms = sum(total_latencies_ms) / n
# Persistence wait mean (for main table)
persist_values_ms = []
for r in combined:
v = r.get("persistence_wait_us")
if v is not None:
persist_values_ms.append(v / 1_000)
mean_persist_ms = sum(persist_values_ms) / len(persist_values_ms) if persist_values_ms else 0.0
return {
"n": n,
"mean_ms": mean_lat,
@@ -121,6 +129,7 @@ def compute_stats(combined: list[dict]) -> dict:
"mean_mgas_s": mean_mgas_s,
"wall_clock_s": wall_clock_s,
"mean_total_lat_ms": mean_total_lat_ms,
"mean_persist_ms": mean_persist_ms,
}
@@ -145,7 +154,7 @@ def compute_wait_stats(combined: list[dict], field: str) -> dict:
def _paired_data(
baseline: list[dict], feature: list[dict]
) -> tuple[list[tuple[float, float]], list[float], list[float], list[float]]:
) -> tuple[list[tuple[float, float]], list[float], list[float], list[float], list[float]]:
"""Match blocks and return paired latencies and per-block diffs.
Returns:
@@ -153,6 +162,7 @@ def _paired_data(
lat_diffs_ms: list of feature baseline latency diffs in ms
mgas_diffs: list of feature baseline Mgas/s diffs
total_lat_diffs_ms: list of feature baseline total latency diffs in ms
persist_diffs_ms: list of feature baseline persistence wait diffs in ms
"""
baseline_by_block = {r["block_number"]: r for r in baseline}
feature_by_block = {r["block_number"]: r for r in feature}
@@ -162,6 +172,7 @@ def _paired_data(
lat_diffs_ms = []
mgas_diffs = []
total_lat_diffs_ms = []
persist_diffs_ms = []
for bn in common_blocks:
b = baseline_by_block[bn]
f = feature_by_block[bn]
@@ -179,7 +190,10 @@ def _paired_data(
total_lat_diffs_ms.append(
f["total_latency_us"] / 1_000 - b["total_latency_us"] / 1_000
)
return pairs, lat_diffs_ms, mgas_diffs, total_lat_diffs_ms
b_persist = (b.get("persistence_wait_us") or 0) / 1_000
f_persist = (f.get("persistence_wait_us") or 0) / 1_000
persist_diffs_ms.append(f_persist - b_persist)
return pairs, lat_diffs_ms, mgas_diffs, total_lat_diffs_ms, persist_diffs_ms
def compute_paired_stats(
@@ -195,13 +209,15 @@ def compute_paired_stats(
all_lat_diffs = []
all_mgas_diffs = []
all_total_lat_diffs = []
all_persist_diffs = []
blocks_per_pair = []
for baseline, feature in zip(baseline_runs, feature_runs):
pairs, lat_diffs, mgas_diffs, total_lat_diffs = _paired_data(baseline, feature)
pairs, lat_diffs, mgas_diffs, total_lat_diffs, persist_diffs = _paired_data(baseline, feature)
all_pairs.extend(pairs)
all_lat_diffs.extend(lat_diffs)
all_mgas_diffs.extend(mgas_diffs)
all_total_lat_diffs.extend(total_lat_diffs)
all_persist_diffs.extend(persist_diffs)
blocks_per_pair.append(len(pairs))
if not all_lat_diffs:
@@ -245,6 +261,11 @@ def compute_paired_stats(
total_se = std_total_diff / math.sqrt(len(all_total_lat_diffs)) if all_total_lat_diffs else 0.0
wall_clock_ci_ms = T_CRITICAL * total_se
mean_persist_diff = sum(all_persist_diffs) / len(all_persist_diffs) if all_persist_diffs else 0.0
std_persist_diff = stddev(all_persist_diffs, mean_persist_diff) if len(all_persist_diffs) > 1 else 0.0
persist_se = std_persist_diff / math.sqrt(len(all_persist_diffs)) if all_persist_diffs else 0.0
persist_ci_ms = T_CRITICAL * persist_se
return {
"n": n,
"mean_diff_ms": mean_diff,
@@ -258,6 +279,7 @@ def compute_paired_stats(
"mean_mgas_diff": mean_mgas_diff,
"mgas_ci": mgas_ci,
"wall_clock_ci_ms": wall_clock_ci_ms,
"persist_ci_ms": persist_ci_ms,
"blocks": max(blocks_per_pair),
}
@@ -290,6 +312,14 @@ def fmt_s(v: float) -> str:
return f"{v:.2f}s"
def display_bal_mode(bal_mode: str | None) -> str | None:
if not bal_mode or bal_mode == "false":
return None
if bal_mode == "both":
return "true"
return bal_mode
def significance(pct: float, ci_pct: float, lower_is_better: bool) -> str:
"""Return significance label: 'good', 'bad', or 'neutral'."""
significant = abs(pct) > ci_pct
@@ -328,6 +358,7 @@ def compute_changes(
("p99", "p99_ms", "p99_ci_ms", "p99_ms", True),
("mgas_s", "mean_mgas_s", "mgas_ci", "mean_mgas_s", False),
("wall_clock", "wall_clock_s", "wall_clock_ci_ms", "mean_total_lat_ms", True),
("persist_wait", "mean_persist_ms", "persist_ci_ms", "mean_persist_ms", True),
]
changes = {}
for name, stat_key, ci_key, base_key, lower_is_better in metrics:
@@ -353,6 +384,7 @@ def generate_comparison_table(
big_blocks: bool = False,
warmup_blocks: str | None = None,
wait_time: str | None = None,
bal_mode: str | None = None,
) -> str:
"""Generate a markdown comparison table between baseline and feature."""
n = paired["blocks"]
@@ -368,6 +400,8 @@ def generate_comparison_table(
p90_pct = pct(run1["p90_ms"], run2["p90_ms"])
p99_pct = pct(run1["p99_ms"], run2["p99_ms"])
persist_pct = pct(run1["mean_persist_ms"], run2["mean_persist_ms"])
# Bootstrap CIs as % of baseline percentile
p50_ci_pct = paired["p50_ci_ms"] / run1["p50_ms"] * 100.0 if run1["p50_ms"] > 0 else 0.0
p90_ci_pct = paired["p90_ci_ms"] / run1["p90_ms"] * 100.0 if run1["p90_ms"] > 0 else 0.0
@@ -377,6 +411,7 @@ def generate_comparison_table(
lat_ci_pct = paired["ci_ms"] / run1["mean_ms"] * 100.0 if run1["mean_ms"] > 0 else 0.0
mgas_ci_pct = paired["mgas_ci"] / run1["mean_mgas_s"] * 100.0 if run1["mean_mgas_s"] > 0 else 0.0
wall_ci_pct = paired["wall_clock_ci_ms"] / run1["mean_total_lat_ms"] * 100.0 if run1["mean_total_lat_ms"] > 0 else 0.0
persist_ci_pct = paired["persist_ci_ms"] / run1["mean_persist_ms"] * 100.0 if run1["mean_persist_ms"] > 0 else 0.0
base_url = f"https://github.com/{repo}/commit"
baseline_label = f"[`{baseline_name}`]({base_url}/{baseline_ref})"
@@ -392,6 +427,7 @@ def generate_comparison_table(
f"| P99 | {fmt_ms(run1['p99_ms'])} | {fmt_ms(run2['p99_ms'])} | {change_str(p99_pct, p99_ci_pct, lower_is_better=True)} |",
f"| Mgas/s | {fmt_mgas(run1['mean_mgas_s'])} | {fmt_mgas(run2['mean_mgas_s'])} | {change_str(gas_pct, mgas_ci_pct, lower_is_better=False)} |",
f"| Wall Clock | {fmt_s(run1['wall_clock_s'])} | {fmt_s(run2['wall_clock_s'])} | {change_str(wall_pct, wall_ci_pct, lower_is_better=True)} |",
f"| Persist Wait | {fmt_ms(run1['mean_persist_ms'])} | {fmt_ms(run2['mean_persist_ms'])} | {change_str(persist_pct, persist_ci_pct, lower_is_better=True)} |",
"",
]
meta_parts = [f"{n} {'big blocks' if big_blocks else 'blocks'}"]
@@ -399,6 +435,9 @@ def generate_comparison_table(
meta_parts.append(f"{warmup_blocks} warmup")
if wait_time:
meta_parts.append(f"wait time: {wait_time}")
display_mode = display_bal_mode(bal_mode)
if big_blocks and display_mode:
meta_parts.append(f"BAL: {display_mode}")
lines.append(f"*{', '.join(meta_parts)}*")
return "\n".join(lines)
@@ -481,6 +520,7 @@ def main():
parser.add_argument("--big-blocks", action="store_true", default=False, help="Big blocks mode")
parser.add_argument("--warmup-blocks", default=None, help="Number of warmup blocks")
parser.add_argument("--wait-time", default=None, help="Wait time interval used between blocks")
parser.add_argument("--bal-mode", default=None, help="BAL mode (true, feature, baseline)")
parser.add_argument("--grafana-url", default=None, help="Grafana dashboard URL for this benchmark run")
args = parser.parse_args()
@@ -520,6 +560,7 @@ def main():
baseline_name = args.baseline_name or "baseline"
feature_name = args.feature_name or "feature"
feature_sha = args.feature_ref or "unknown"
bal_mode = display_bal_mode(args.bal_mode)
comparison_table = generate_comparison_table(
baseline_stats,
@@ -533,6 +574,7 @@ def main():
big_blocks=args.big_blocks,
warmup_blocks=args.warmup_blocks,
wait_time=args.wait_time,
bal_mode=bal_mode,
)
print(f"Generated comparison ({paired_stats['n']} paired blocks, "
f"mean diff {paired_stats['mean_diff_ms']:+.3f}ms ± {paired_stats['ci_ms']:.3f}ms)")
@@ -566,6 +608,7 @@ def main():
"big_blocks": args.big_blocks,
"warmup_blocks": args.warmup_blocks,
"wait_time": args.wait_time,
"bal_mode": bal_mode,
"baseline": {
"name": baseline_name,
"ref": baseline_ref,

View File

@@ -250,6 +250,8 @@ async function success({ core, context }) {
}
}
const slackMode = process.env.BENCH_SLACK || 'always';
// Post to public channel if any metric shows significant improvement or regression
const channel = process.env.SLACK_BENCH_CHANNEL;
let postedToChannel = false;
@@ -264,6 +266,14 @@ async function success({ core, context }) {
}
}
// In on-win mode, only notify on improvement — skip DM fallback entirely
if (slackMode === 'on-win') {
if (!postedToChannel) {
core.info('on-win mode: no improvement detected, skipping all notifications');
}
return;
}
// DM the actor only when results were not posted to the public channel
if (!postedToChannel) {
if (actorSlackId) {

View File

@@ -39,10 +39,25 @@ function loadSamplyUrls(workDir) {
return urls;
}
function balModeLabel(mode) {
switch (mode) {
case 'true':
case 'feature':
case 'baseline':
return mode;
case 'both':
return 'true';
default:
return '';
}
}
function blocksLabel(summary) {
const parts = [];
if (summary.big_blocks) {
parts.push({ key: 'Big Blocks', value: summary.blocks });
const balMode = balModeLabel(summary.bal_mode || summary.bal || process.env.BENCH_BAL || 'false');
if (balMode) parts.push({ key: 'BAL', value: balMode });
} else {
const warmup = summary.warmup_blocks || process.env.BENCH_WARMUP_BLOCKS || '';
if (warmup) parts.push({ key: 'Warmup', value: warmup });
@@ -68,6 +83,7 @@ function metricRows(summary) {
{ label: 'P99', baseline: fmtMs(b.p99_ms), feature: fmtMs(f.p99_ms), change: fmtChange(c.p99) },
{ label: 'Mgas/s', baseline: fmtMgas(b.mean_mgas_s), feature: fmtMgas(f.mean_mgas_s), change: fmtChange(c.mgas_s) },
{ label: 'Wall Clock', baseline: fmtS(b.wall_clock_s), feature: fmtS(f.wall_clock_s), change: fmtChange(c.wall_clock) },
{ label: 'Persist Wait', baseline: fmtMs(b.mean_persist_ms || 0), feature: fmtMs(f.mean_persist_ms || 0), change: fmtChange(c.persist_wait) },
];
}

View File

@@ -21,7 +21,6 @@ engine-cancun:
# Affects all clients, not just reth. Tracked: https://github.com/ethereum/hive/issues/1382
- Invalid Missing Ancestor Syncing ReOrg, Timestamp, EmptyTxs=False, CanonicalReOrg=False, Invalid P8 (Cancun) (reth)
- Invalid Missing Ancestor Syncing ReOrg, Timestamp, EmptyTxs=False, CanonicalReOrg=True, Invalid P8 (Cancun) (reth)
- Multiple New Payloads Extending Canonical Chain, Wait for Canonical Payload (Cancun) (reth)
engine-api:
- Transaction Re-Org, Re-Org Out (Paris) (reth)
- Transaction Re-Org, Re-Org to Different Block (Paris) (reth)
@@ -31,5 +30,3 @@ engine-api:
- Invalid Missing Ancestor Syncing ReOrg, Transaction Signature, EmptyTxs=False, CanonicalReOrg=True, Invalid P9 (Paris) (reth)
- Invalid Missing Ancestor Syncing ReOrg, Transaction Signature, EmptyTxs=False, CanonicalReOrg=False, Invalid P9 (Paris) (reth)
- Invalid Missing Ancestor ReOrg, StateRoot, EmptyTxs=True, Invalid P10 (Paris) (reth)
- Multiple New Payloads Extending Canonical Chain, Wait for Canonical Payload (Paris) (reth)
- Multiple New Payloads Extending Canonical Chain, Set Head to First Payload Received (Paris) (reth)

View File

@@ -28,11 +28,16 @@ on:
required: false
default: false
type: boolean
no_slack:
description: "Suppress Slack notifications"
slack:
description: "Slack notification policy"
required: false
default: true
type: boolean
default: "never"
type: choice
options:
- always
- on-win
- on-error
- never
mode:
description: "Benchmark mode"
required: false
@@ -106,7 +111,7 @@ jobs:
.github/scripts/bench-scheduled-refs.sh "$FORCE" "$MODE"
- name: Alert on long-running hourly
if: steps.mode.outputs.mode == 'hourly' && steps.refs.outputs.long-running == 'true'
if: steps.mode.outputs.mode == 'hourly' && steps.refs.outputs.long-running == 'true' && !(github.event_name == 'workflow_dispatch' && inputs.slack == 'never')
uses: actions/github-script@v8
env:
SLACK_BENCH_BOT_TOKEN: ${{ secrets.SLACK_BENCH_BOT_TOKEN }}
@@ -148,7 +153,7 @@ jobs:
});
- name: Alert on stale nightly
if: steps.mode.outputs.mode == 'nightly' && steps.refs.outputs.is-stale == 'true'
if: steps.mode.outputs.mode == 'nightly' && steps.refs.outputs.is-stale == 'true' && !(github.event_name == 'workflow_dispatch' && inputs.slack == 'never')
uses: actions/github-script@v8
env:
SLACK_BENCH_BOT_TOKEN: ${{ secrets.SLACK_BENCH_BOT_TOKEN }}
@@ -256,7 +261,7 @@ jobs:
BENCH_FEATURE_ARGS: ""
BENCH_ABBA: "true"
BENCH_COMMENT_ID: ""
BENCH_NO_SLACK: ${{ github.event_name == 'workflow_dispatch' && inputs.no_slack == true && 'true' || 'false' }}
BENCH_SLACK: ${{ github.event_name == 'workflow_dispatch' && inputs.slack || 'always' }}
BENCH_METRICS_ADDR: "127.0.0.1:9100"
BENCH_OTLP_DISABLED: "true"
BASELINE_REF: ${{ needs.resolve-refs.outputs.baseline-ref }}
@@ -753,7 +758,7 @@ jobs:
await core.summary.addRaw(md).write();
- name: Send Slack notification (success)
if: success() && env.BENCH_NO_SLACK != 'true'
if: success() && (env.BENCH_SLACK == 'always' || env.BENCH_SLACK == 'on-win')
uses: actions/github-script@v8
env:
SLACK_BENCH_BOT_TOKEN: ${{ secrets.SLACK_BENCH_BOT_TOKEN }}
@@ -781,7 +786,15 @@ jobs:
// Filter notifications based on mode
const changes = summary.changes || {};
const mode = process.env.BENCH_MODE || 'nightly';
const slackMode = process.env.BENCH_SLACK || 'always';
const hasRegression = Object.values(changes).some(c => c.sig === 'bad');
const hasImprovement = Object.values(changes).some(c => c.sig === 'good');
// on-win mode: only notify on improvements
if (slackMode === 'on-win' && !hasImprovement) {
core.info('on-win mode: no improvement detected, skipping Slack notification');
return;
}
// Hourly mode: only notify on regressions
if (mode === 'hourly' && !hasRegression) {
@@ -900,7 +913,7 @@ jobs:
}
- name: Send Slack notification (failure)
if: failure()
if: failure() && env.BENCH_SLACK != 'never' && env.BENCH_SLACK != 'on-win'
uses: actions/github-script@v8
env:
SLACK_BENCH_BOT_TOKEN: ${{ secrets.SLACK_BENCH_BOT_TOKEN }}
@@ -935,7 +948,7 @@ jobs:
},
{
type: 'section',
text: { type: 'mrkdwn', text: `*${modeLabel} regression* failed while *${failedStep}*\ncc <@U09FARE0B9Q> <@U09FAL2UMLJ>` },
text: { type: 'mrkdwn', text: `*${modeLabel} regression* failed while *${failedStep}*\ncc <@U09FARE0B9Q> <@U09FAL2UMLJ>\n@ai investigate this` },
},
{
type: 'actions',

View File

@@ -14,13 +14,23 @@ on:
blocks:
description: "Number of blocks to benchmark"
required: false
default: "500"
default: "200"
type: string
big_blocks:
description: "Use big blocks mode (pre-generated merged payloads with reth-bb)"
required: false
default: "false"
type: boolean
bal:
description: "Replay block access lists during big-block benchmarks"
required: false
default: "false"
type: choice
options:
- "false"
- "true"
- "feature"
- "baseline"
warmup:
description: "Number of warmup blocks"
required: false
@@ -61,11 +71,16 @@ on:
required: false
default: "0"
type: string
no_slack:
description: "Suppress Slack notifications for benchmark results"
slack:
description: "Slack notification policy"
required: false
default: "true"
type: boolean
default: "never"
type: choice
options:
- always
- on-win
- on-error
- never
abba:
description: "Run ABBA (BFFB) interleaved order; false = single AB pass"
required: false
@@ -105,9 +120,10 @@ jobs:
baseline-name: ${{ steps.args.outputs.baseline-name }}
feature-name: ${{ steps.args.outputs.feature-name }}
samply: ${{ steps.args.outputs.samply }}
no-slack: ${{ steps.args.outputs.no-slack }}
slack: ${{ steps.args.outputs.slack }}
cores: ${{ steps.args.outputs.cores }}
big-blocks: ${{ steps.args.outputs.big-blocks }}
bal: ${{ steps.args.outputs.bal }}
wait-time: ${{ steps.args.outputs.wait-time }}
baseline-args: ${{ steps.args.outputs.baseline-args }}
feature-args: ${{ steps.args.outputs.feature-args }}
@@ -140,18 +156,24 @@ jobs:
with:
github-token: ${{ secrets.DEREK_PAT }}
script: |
let pr, actor, blocks, warmup, baseline, feature, samply, cores, bigBlocks;
const validBalModes = new Set(['false', 'true', 'feature', 'baseline']);
const validSlackModes = new Set(['always', 'on-win', 'on-error', 'never']);
const usage = '`@decofe bench [blocks=N] [big-blocks[=true|false]] [bal=true|false|feature|baseline] [warmup=N] [baseline=REF] [feature=REF] [samply] [slack=always|on-win|on-error|never] [cores=N] [abba=true|false] [otlp=true|false] [wait-time=DURATION] [baseline-args="..."] [feature-args="..."]`';
let pr, actor, blocks, warmup, baseline, feature, samply, cores, bigBlocks, bal;
let explicitWarmup = false;
if (context.eventName === 'workflow_dispatch') {
actor = '${{ github.actor }}';
blocks = '${{ github.event.inputs.blocks }}' || '500';
blocks = '${{ github.event.inputs.blocks }}' || '200';
warmup = '${{ github.event.inputs.warmup }}' || '100';
if (warmup !== '100') explicitWarmup = true;
baseline = '${{ github.event.inputs.baseline }}';
feature = '${{ github.event.inputs.feature }}';
samply = '${{ github.event.inputs.samply }}' === 'true' ? 'true' : 'false';
var noSlack = '${{ github.event.inputs.no_slack }}' !== 'false' ? 'true' : 'false';
var slack = '${{ github.event.inputs.slack }}' || 'never';
cores = '${{ github.event.inputs.cores }}' || '0';
bigBlocks = '${{ github.event.inputs.big_blocks }}' === 'true' ? 'true' : 'false';
bal = '${{ github.event.inputs.bal }}' || 'false';
var abba = '${{ github.event.inputs.abba }}' !== 'false' ? 'true' : 'false';
var otlp = '${{ github.event.inputs.otlp }}' !== 'false' ? 'true' : 'false';
var waitTime = '${{ github.event.inputs.wait_time }}' || '';
@@ -178,11 +200,12 @@ jobs:
const body = context.payload.comment.body.trim();
const intArgs = new Set(['warmup', 'cores', 'blocks']);
const refArgs = new Set(['baseline', 'feature']);
const boolArgs = new Set(['samply', 'no-slack', 'big-blocks']);
const boolArgs = new Set(['samply', 'big-blocks']);
const boolDefaultTrue = new Set(['abba', 'otlp']);
const enumArgs = new Map([['bal', validBalModes], ['slack', validSlackModes]]);
const durationArgs = new Set(['wait-time']);
const stringArgs = new Set(['baseline-args', 'feature-args']);
const defaults = { blocks: '500', warmup: '100', baseline: '', feature: '', samply: 'false', 'no-slack': 'false', 'big-blocks': 'false', cores: '0', abba: 'true', otlp: 'true', 'wait-time': '', 'baseline-args': '', 'feature-args': '' };
const defaults = { blocks: '200', warmup: '100', baseline: '', feature: '', samply: 'false', slack: 'always', 'big-blocks': 'false', bal: 'false', cores: '0', abba: 'true', otlp: 'true', 'wait-time': '', 'baseline-args': '', 'feature-args': '' };
const unknown = [];
const invalid = [];
const args = body.replace(/^(?:@decofe|derek) bench\s*/, '');
@@ -209,7 +232,7 @@ jobs:
if ((value.startsWith('"') && value.endsWith('"')) || (value.startsWith("'") && value.endsWith("'"))) {
value = value.slice(1, -1);
}
if (boolDefaultTrue.has(key)) {
if (boolArgs.has(key) || boolDefaultTrue.has(key)) {
if (value === 'true' || value === 'false') {
defaults[key] = value;
} else {
@@ -221,11 +244,18 @@ jobs:
} else {
invalid.push(`\`${key}=${value}\` (must be a duration like 500ms, 1s, 2m)`);
}
} else if (enumArgs.has(key)) {
if (enumArgs.get(key).has(value)) {
defaults[key] = value;
} else {
invalid.push(`\`${key}=${value}\` (must be true, false, feature, or baseline)`);
}
} else if (intArgs.has(key)) {
if (!/^\d+$/.test(value)) {
invalid.push(`\`${key}=${value}\` (must be a positive integer)`);
} else {
defaults[key] = value;
if (key === 'warmup') explicitWarmup = true;
}
} else if (refArgs.has(key)) {
if (!value) {
@@ -243,7 +273,7 @@ jobs:
if (unknown.length) errors.push(`Unknown argument(s): \`${unknown.join('`, `')}\``);
if (invalid.length) errors.push(`Invalid value(s): ${invalid.join(', ')}`);
if (errors.length) {
const msg = `❌ **Invalid bench command**\n\n${errors.join('\n')}\n\n**Usage:** \`@decofe bench [blocks=N] [big-blocks] [warmup=N] [baseline=REF] [feature=REF] [samply] [no-slack] [cores=N] [abba=true|false] [otlp=true|false] [wait-time=DURATION] [baseline-args="..."] [feature-args="..."]\``;
const msg = `❌ **Invalid bench command**\n\n${errors.join('\n')}\n\n**Usage:** ${usage}`;
await github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
@@ -258,9 +288,10 @@ jobs:
baseline = defaults.baseline;
feature = defaults.feature;
samply = defaults.samply;
var noSlack = defaults['no-slack'];
var slack = defaults.slack;
cores = defaults.cores;
bigBlocks = defaults['big-blocks'];
bal = defaults.bal;
var abba = defaults.abba;
var otlp = defaults.otlp;
var waitTime = defaults['wait-time'];
@@ -268,6 +299,29 @@ jobs:
var featureNodeArgs = defaults['feature-args'];
}
// Default warmup to 20 for big-blocks mode unless explicitly set
if (bigBlocks === 'true' && !explicitWarmup) {
warmup = '20';
}
if (!validBalModes.has(bal)) {
core.setFailed(`Invalid bal mode: ${bal}`);
return;
}
if (bal !== 'false' && bigBlocks !== 'true') {
const msg = `❌ **Invalid bench command**\n\n\`bal\` requires \`big-blocks=true\`.\n\n**Usage:** ${usage}`;
if (context.eventName === 'issue_comment') {
await github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: context.issue.number,
body: msg,
});
}
core.setFailed(msg);
return;
}
// Resolve display names for baseline/feature
let baselineName = baseline || 'main';
let featureName = feature;
@@ -293,9 +347,10 @@ jobs:
core.setOutput('baseline-name', baselineName);
core.setOutput('feature-name', featureName);
core.setOutput('samply', samply);
core.setOutput('no-slack', noSlack);
core.setOutput('slack', slack);
core.setOutput('cores', cores);
core.setOutput('big-blocks', bigBlocks);
core.setOutput('bal', bal);
core.setOutput('wait-time', waitTime);
core.setOutput('baseline-args', baselineNodeArgs);
core.setOutput('feature-args', featureNodeArgs);
@@ -358,10 +413,12 @@ jobs:
const baseline = '${{ steps.args.outputs.baseline-name }}';
const feature = '${{ steps.args.outputs.feature-name }}';
const samply = '${{ steps.args.outputs.samply }}' === 'true';
const noSlack = '${{ steps.args.outputs.no-slack }}' === 'true';
const slack = '${{ steps.args.outputs.slack }}' || 'always';
const bigBlocks = '${{ steps.args.outputs.big-blocks }}' === 'true';
const bal = '${{ steps.args.outputs.bal }}' || 'false';
const samplyNote = samply ? ', samply: `enabled`' : '';
const noSlackNote = noSlack ? ', no-slack' : '';
const slackNote = slack !== 'always' ? `, slack: \`${slack}\`` : '';
const balNote = bigBlocks && bal !== 'false' ? `, BAL: \`${bal}\`` : '';
const cores = '${{ steps.args.outputs.cores }}';
const coresNote = cores && cores !== '0' ? `, cores: \`${cores}\`` : '';
const abbaEnabled = '${{ steps.args.outputs.abba }}' !== 'false';
@@ -375,7 +432,7 @@ jobs:
const featureArgsVal = '${{ steps.args.outputs.feature-args }}';
const featureArgsNote = featureArgsVal ? `, feature-args: \`${featureArgsVal}\`` : '';
const blocksDesc = bigBlocks ? 'blocks: `big`' : `${blocks} blocks, ${warmup} warmup blocks`;
const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${noSlackNote}${coresNote}${abbaNote}${otlpNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`;
const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${slackNote}${balNote}${coresNote}${abbaNote}${otlpNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`;
const { data: comment } = await github.rest.issues.createComment({
owner: context.repo.owner,
@@ -400,10 +457,12 @@ jobs:
const baseline = '${{ steps.args.outputs.baseline-name }}';
const feature = '${{ steps.args.outputs.feature-name }}';
const samply = '${{ steps.args.outputs.samply }}' === 'true';
const noSlack = '${{ steps.args.outputs.no-slack }}' === 'true';
const slack = '${{ steps.args.outputs.slack }}' || 'always';
const bigBlocks = '${{ steps.args.outputs.big-blocks }}' === 'true';
const bal = '${{ steps.args.outputs.bal }}' || 'false';
const samplyNote = samply ? ', samply: `enabled`' : '';
const noSlackNote = noSlack ? ', no-slack' : '';
const slackNote = slack !== 'always' ? `, slack: \`${slack}\`` : '';
const balNote = bigBlocks && bal !== 'false' ? `, BAL: \`${bal}\`` : '';
const cores = '${{ steps.args.outputs.cores }}';
const coresNote = cores && cores !== '0' ? `, cores: \`${cores}\`` : '';
const abbaEnabled = '${{ steps.args.outputs.abba }}' !== 'false';
@@ -417,7 +476,7 @@ jobs:
const featureArgsVal = '${{ steps.args.outputs.feature-args }}';
const featureArgsNote = featureArgsVal ? `, feature-args: \`${featureArgsVal}\`` : '';
const blocksDesc = bigBlocks ? 'blocks: `big`' : `${blocks} blocks, ${warmup} warmup blocks`;
const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${noSlackNote}${coresNote}${abbaNote}${otlpNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`;
const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${slackNote}${balNote}${coresNote}${abbaNote}${otlpNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`;
const runUrl = `${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/actions/runs/${context.runId}`;
const numRunners = parseInt(process.env.BENCH_RUNNERS) || 1;
@@ -483,13 +542,14 @@ jobs:
BENCH_SAMPLY: ${{ needs.reth-bench-ack.outputs.samply }}
BENCH_CORES: ${{ needs.reth-bench-ack.outputs.cores }}
BENCH_BIG_BLOCKS: ${{ needs.reth-bench-ack.outputs.big-blocks }}
BENCH_BAL: ${{ needs.reth-bench-ack.outputs.bal }}
BENCH_WAIT_TIME: ${{ needs.reth-bench-ack.outputs.wait-time }}
BENCH_BASELINE_ARGS: ${{ needs.reth-bench-ack.outputs.baseline-args }}
BENCH_FEATURE_ARGS: ${{ needs.reth-bench-ack.outputs.feature-args }}
BENCH_ABBA: ${{ needs.reth-bench-ack.outputs.abba }}
BENCH_OTLP: ${{ needs.reth-bench-ack.outputs.otlp }}
BENCH_COMMENT_ID: ${{ needs.reth-bench-ack.outputs.comment-id }}
BENCH_NO_SLACK: ${{ needs.reth-bench-ack.outputs.no-slack }}
BENCH_SLACK: ${{ needs.reth-bench-ack.outputs.slack }}
BENCH_NODE_BIN: ${{ needs.reth-bench-ack.outputs.big-blocks == 'true' && 'reth-bb' || 'reth' }}
BENCH_METRICS_ADDR: "127.0.0.1:9100"
BENCH_OTLP_TRACES_ENDPOINT: ${{ needs.reth-bench-ack.outputs.otlp != 'false' && secrets.BENCH_OTLP_TRACES_ENDPOINT || '' }}
@@ -544,10 +604,12 @@ jobs:
const baseline = '${{ needs.reth-bench-ack.outputs.baseline-name }}';
const feature = '${{ needs.reth-bench-ack.outputs.feature-name }}';
const samply = process.env.BENCH_SAMPLY === 'true';
const noSlack = process.env.BENCH_NO_SLACK === 'true';
const slack = process.env.BENCH_SLACK || 'always';
const bigBlocks = process.env.BENCH_BIG_BLOCKS === 'true';
const bal = process.env.BENCH_BAL || 'false';
const samplyNote = samply ? ', samply: `enabled`' : '';
const noSlackNote = noSlack ? ', no-slack' : '';
const slackNote = slack !== 'always' ? `, slack: \`${slack}\`` : '';
const balNote = bigBlocks && bal !== 'false' ? `, BAL: \`${bal}\`` : '';
const cores = process.env.BENCH_CORES || '0';
const coresNote = cores && cores !== '0' ? `, cores: \`${cores}\`` : '';
const abbaEnabled = (process.env.BENCH_ABBA || 'true') !== 'false';
@@ -561,7 +623,7 @@ jobs:
const featureArgsVal = process.env.BENCH_FEATURE_ARGS || '';
const featureArgsNote = featureArgsVal ? `, feature-args: \`${featureArgsVal}\`` : '';
const blocksDesc = bigBlocks ? 'blocks: `big`' : `${blocks} blocks, ${warmup} warmup blocks`;
core.exportVariable('BENCH_CONFIG', `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${noSlackNote}${coresNote}${abbaNote}${otlpNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`);
core.exportVariable('BENCH_CONFIG', `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${slackNote}${balNote}${coresNote}${abbaNote}${otlpNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`);
const { buildBody } = require('./.github/scripts/bench-update-status.js');
await github.rest.issues.updateComment({
@@ -1134,6 +1196,9 @@ jobs:
if [ -n "${BENCH_WAIT_TIME:-}" ]; then
SUMMARY_ARGS="$SUMMARY_ARGS --wait-time $BENCH_WAIT_TIME"
fi
if [ -n "${BENCH_BAL:-}" ] && [ "${BENCH_BAL}" != "false" ]; then
SUMMARY_ARGS="$SUMMARY_ARGS --bal-mode $BENCH_BAL"
fi
GRAFANA_URL='${{ steps.metrics.outputs.grafana-url }}'
if [ -n "$GRAFANA_URL" ]; then
SUMMARY_ARGS="$SUMMARY_ARGS --grafana-url $GRAFANA_URL"
@@ -1289,7 +1354,7 @@ jobs:
});
- name: Send Slack notification (success)
if: success() && env.BENCH_NO_SLACK != 'true'
if: success() && (env.BENCH_SLACK == 'always' || env.BENCH_SLACK == 'on-win')
uses: actions/github-script@v8
env:
SLACK_BENCH_BOT_TOKEN: ${{ secrets.SLACK_BENCH_BOT_TOKEN }}
@@ -1334,7 +1399,7 @@ jobs:
});
- name: Send Slack notification (failure)
if: failure()
if: failure() && env.BENCH_SLACK != 'never' && env.BENCH_SLACK != 'on-win'
uses: actions/github-script@v8
env:
SLACK_BENCH_BOT_TOKEN: ${{ secrets.SLACK_BENCH_BOT_TOKEN }}

623
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -325,8 +325,8 @@ reth-cli = { path = "crates/cli/cli" }
reth-cli-commands = { path = "crates/cli/commands" }
reth-cli-runner = { path = "crates/cli/runner" }
reth-cli-util = { path = "crates/cli/util" }
reth-codecs = { version = "0.2.0", default-features = false }
reth-codecs-derive = "0.2.0"
reth-codecs = { version = "0.3.0", default-features = false }
reth-codecs-derive = "0.3.0"
reth-config = { path = "crates/config", default-features = false }
reth-consensus = { path = "crates/consensus/consensus", default-features = false }
reth-consensus-common = { path = "crates/consensus/common", default-features = false }
@@ -394,7 +394,7 @@ reth-payload-builder-primitives = { path = "crates/payload/builder-primitives" }
reth-payload-primitives = { path = "crates/payload/primitives" }
reth-payload-validator = { path = "crates/payload/validator" }
reth-payload-util = { path = "crates/payload/util" }
reth-primitives-traits = { version = "0.2.0", default-features = false }
reth-primitives-traits = { version = "0.3.0", default-features = false }
reth-provider = { path = "crates/storage/provider" }
reth-prune = { path = "crates/prune/prune" }
reth-prune-types = { path = "crates/prune/types", default-features = false }
@@ -410,7 +410,7 @@ reth-rpc-eth-types = { path = "crates/rpc/rpc-eth-types", default-features = fal
reth-rpc-layer = { path = "crates/rpc/rpc-layer" }
reth-rpc-server-types = { path = "crates/rpc/rpc-server-types" }
reth-rpc-convert = { path = "crates/rpc/rpc-convert" }
reth-rpc-traits = { version = "0.2.0", default-features = false }
reth-rpc-traits = { version = "0.3.0", default-features = false }
reth-stages = { path = "crates/stages/stages" }
reth-stages-api = { path = "crates/stages/api" }
reth-stages-types = { path = "crates/stages/types", default-features = false }
@@ -429,17 +429,17 @@ reth-trie-common = { path = "crates/trie/common", default-features = false }
reth-trie-db = { path = "crates/trie/db" }
reth-trie-parallel = { path = "crates/trie/parallel" }
reth-trie-sparse = { path = "crates/trie/sparse", default-features = false }
reth-zstd-compressors = { version = "0.2.0", default-features = false }
reth-zstd-compressors = { version = "0.3.0", default-features = false }
# revm
revm = { version = "36.0.0", default-features = false }
revm-bytecode = { version = "9.0.0", default-features = false }
revm-database = { version = "12.0.0", default-features = false }
revm-state = { version = "10.0.0", default-features = false }
revm-primitives = { version = "22.1.0", default-features = false }
revm-interpreter = { version = "34.0.0", default-features = false }
revm-database-interface = { version = "10.0.0", default-features = false }
revm-inspectors = "0.37.0"
revm = { version = "37.0.0", default-features = false }
revm-bytecode = { version = "10.0.0", default-features = false }
revm-database = { version = "13.0.0", default-features = false }
revm-state = { version = "11.0.0", default-features = false }
revm-primitives = { version = "23.0.0", default-features = false }
revm-interpreter = { version = "35.0.0", default-features = false }
revm-database-interface = { version = "11.0.0", default-features = false }
revm-inspectors = "0.38.0"
# eth
alloy-dyn-abi = "1.5.6"
@@ -449,7 +449,7 @@ alloy-sol-types = { version = "1.5.6", default-features = false }
alloy-chains = { version = "0.2.33", default-features = false }
alloy-eip2124 = { version = "0.2.0", default-features = false }
alloy-eip7928 = { version = "0.3.0", default-features = false }
alloy-evm = { version = "0.31.0", default-features = false }
alloy-evm = { version = "0.32.0", default-features = false }
alloy-rlp = { version = "0.3.13", default-features = false, features = ["core-net"] }
alloy-trie = { version = "0.9.4", default-features = false }
@@ -698,3 +698,19 @@ vergen-git2 = "9.1.0"
# networking
ipnet = "2.11"
[patch.crates-io]
revm = { git = "https://github.com/bluealloy/revm", rev = "89ecb25dbe49e1c3a10d99529e42f027d0bd2386" }
revm-bytecode = { git = "https://github.com/bluealloy/revm", rev = "89ecb25dbe49e1c3a10d99529e42f027d0bd2386" }
revm-database = { git = "https://github.com/bluealloy/revm", rev = "89ecb25dbe49e1c3a10d99529e42f027d0bd2386" }
revm-state = { git = "https://github.com/bluealloy/revm", rev = "89ecb25dbe49e1c3a10d99529e42f027d0bd2386" }
revm-primitives = { git = "https://github.com/bluealloy/revm", rev = "89ecb25dbe49e1c3a10d99529e42f027d0bd2386" }
revm-interpreter = { git = "https://github.com/bluealloy/revm", rev = "89ecb25dbe49e1c3a10d99529e42f027d0bd2386" }
revm-database-interface = { git = "https://github.com/bluealloy/revm", rev = "89ecb25dbe49e1c3a10d99529e42f027d0bd2386" }
revm-inspectors = { git = "https://github.com/paradigmxyz/revm-inspectors", rev = "c6f88bbe7186d863f4667dd43c42608eb7a8ba5c" }
alloy-evm = { git = "https://github.com/alloy-rs/evm", rev = "ff0bbec9ccaa818155e25003a77f4d73d350bbd7" }
reth-codecs = { git = "https://github.com/paradigmxyz/reth-core", rev = "6b12498871bc1b1d42c6dcf28968c271660de8c0" }
reth-codecs-derive = { git = "https://github.com/paradigmxyz/reth-core", rev = "6b12498871bc1b1d42c6dcf28968c271660de8c0" }
reth-primitives-traits = { git = "https://github.com/paradigmxyz/reth-core", rev = "6b12498871bc1b1d42c6dcf28968c271660de8c0" }
reth-rpc-traits = { git = "https://github.com/paradigmxyz/reth-core", rev = "6b12498871bc1b1d42c6dcf28968c271660de8c0" }
reth-zstd-compressors = { git = "https://github.com/paradigmxyz/reth-core", rev = "6b12498871bc1b1d42c6dcf28968c271660de8c0" }

View File

@@ -11,7 +11,7 @@ use alloy_eips::eip7685::Requests;
use alloy_evm::{
block::{
BlockExecutionError, BlockExecutionResult, BlockExecutor, BlockExecutorFactory,
BlockExecutorFor, ExecutableTx, OnStateHook, StateChangeSource, StateDB,
BlockExecutorFor, ExecutableTx, GasOutput, OnStateHook, StateChangeSource, StateDB,
},
eth::{EthBlockExecutionCtx, EthBlockExecutor, EthEvmContext, EthTxResult},
precompiles::PrecompilesMap,
@@ -386,7 +386,10 @@ where
self.inner_mut().execute_transaction_without_commit(tx)
}
fn commit_transaction(&mut self, output: Self::Result) -> Result<u64, BlockExecutionError> {
fn commit_transaction(
&mut self,
output: Self::Result,
) -> Result<GasOutput, BlockExecutionError> {
let gas_used = self.inner_mut().commit_transaction(output)?;
// Fix up cumulative_gas_used on the just-committed receipt so that

View File

@@ -31,8 +31,10 @@ reth-tracing.workspace = true
# alloy
alloy-consensus.workspace = true
alloy-eip7928 = { workspace = true, features = ["rlp"] }
alloy-eips.workspace = true
alloy-json-rpc.workspace = true
alloy-rlp.workspace = true
alloy-primitives = { workspace = true, features = ["rand"] }
alloy-provider = { workspace = true, features = ["engine-api", "pubsub", "reqwest-rustls-tls"], default-features = false }

View File

@@ -54,13 +54,8 @@ impl BenchContext {
}
}
// set up alloy client for blocks, retrying on 429/503 (default) and 502
let retry_policy =
RateLimitRetryPolicy::default().or(|err: &alloy_transport::TransportError| -> bool {
err.as_transport_err()
.and_then(|t| t.as_http_error())
.is_some_and(|e| e.status == 502)
});
// set up alloy client for blocks, retrying on any errors, whether HTTP or OS
let retry_policy = RateLimitRetryPolicy::default().or(|_| true);
let max_retries = bench_args.rpc_block_fetch_retries.as_max_retries();
let client = ClientBuilder::default()
.layer(RetryBackoffLayer::new_with_policy(max_retries, 800, u64::MAX, retry_policy))

View File

@@ -6,7 +6,12 @@
//! [`ExecutionData`] and environment switches at each block boundary.
use alloy_consensus::{TxEnvelope, TxReceipt};
use alloy_eips::{eip1559::BaseFeeParams, eip7840::BlobParams, Typed2718};
use alloy_eips::{
eip1559::BaseFeeParams,
eip7840::BlobParams,
eip7928::{AccountChanges, BlockAccessList, SlotChanges},
BlockNumberOrTag, Typed2718,
};
use alloy_primitives::{Bloom, Bytes, B256};
use alloy_provider::{network::AnyNetwork, Provider, RootProvider};
use alloy_rpc_client::ClientBuilder;
@@ -24,7 +29,7 @@ use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
use reth_ethereum_primitives::Receipt;
use reth_primitives_traits::proofs;
use serde::{Deserialize, Serialize};
use std::future::Future;
use std::{collections::HashMap, future::Future};
use tracing::{info, warn};
/// A single transaction with its gas used and raw encoded bytes.
@@ -215,6 +220,9 @@ pub struct BigBlockPayload {
/// Big block data containing environment switches and prior block hashes.
#[serde(default)]
pub big_block_data: BigBlockData<ExecutionData>,
/// Flattened BAL across all constituent blocks, if requested during generation.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub block_access_list: Option<BlockAccessList>,
}
/// `reth bench generate-big-block` command
@@ -252,6 +260,11 @@ pub struct Command {
/// Output directory for generated payloads.
#[arg(long, value_name = "OUTPUT_DIR")]
output_dir: std::path::PathBuf,
/// Query `eth_getBlockAccessListByBlockNumber` for each fetched block and persist
/// the flattened BAL on the stored payload.
#[arg(long, default_value_t = false)]
bal: bool,
}
impl Command {
@@ -273,6 +286,7 @@ impl Command {
from_block = self.from_block,
target_gas = self.target_gas,
num_big_blocks = self.num_big_blocks,
include_bal = self.bal,
chain = %chain_spec.chain(),
output_dir = %self.output_dir.display(),
"Generating big block payloads"
@@ -312,6 +326,7 @@ impl Command {
// Fetch consecutive blocks until the gas target is reached.
let mut blocks = Vec::new();
let mut block_receipts: Vec<Vec<Receipt>> = Vec::new();
let mut block_access_lists: Vec<Option<BlockAccessList>> = Vec::new();
let mut accumulated_block_gas: u64 = 0;
let mut reached_chain_tip = false;
@@ -338,6 +353,14 @@ impl Command {
Err(e) => return Err(e.into()),
};
let block_access_list = if self.bal {
Some(fetch_block_access_list(&provider, block_number).await.wrap_err_with(
|| format!("Failed to fetch BAL for block {block_number}"),
)?)
} else {
None
};
// Convert RPC receipts to consensus receipts
let consensus_receipts: Vec<Receipt> = receipts
.iter()
@@ -375,10 +398,14 @@ impl Command {
let execution_data = ExecutionData { payload, sidecar };
let block_gas = execution_data.payload.as_v1().gas_used;
let block_blob_gas =
execution_data.payload.as_v3().map(|v3| v3.blob_gas_used).unwrap_or(0);
info!(
target: "reth-bench",
block_number,
gas_used = block_gas,
blob_gas_used = block_blob_gas,
tx_count = execution_data.payload.transactions().len(),
receipts = consensus_receipts.len(),
"Fetched block"
@@ -387,6 +414,7 @@ impl Command {
accumulated_block_gas += block_gas;
blocks.push(execution_data);
block_receipts.push(consensus_receipts);
block_access_lists.push(block_access_list);
next_block += 1;
}
@@ -404,6 +432,7 @@ impl Command {
// Block 0 is the base
let mut base = blocks.remove(0);
let base_receipts = block_receipts.remove(0);
let mut merged_block_access_list = block_access_lists.remove(0);
let mut env_switches = Vec::new();
// Accumulate all receipts with corrected cumulative_gas_used.
@@ -439,12 +468,22 @@ impl Command {
let mut total_gas_limit = base.payload.as_v1().gas_limit;
// Concatenate transactions from subsequent blocks and build env_switches
for (block_data, receipts) in blocks.into_iter().zip(block_receipts) {
for ((block_data, receipts), block_access_list) in
blocks.into_iter().zip(block_receipts).zip(block_access_lists)
{
let block_v1 = block_data.payload.as_v1();
let block_gas = block_v1.gas_used;
total_gas_used += block_gas;
total_gas_limit += block_v1.gas_limit;
if let Some(block_access_list) = block_access_list {
merge_block_access_list(
merged_block_access_list.get_or_insert_with(Default::default),
block_access_list,
cumulative_tx_count as u64,
);
}
// Accumulate receipts with corrected cumulative_gas_used
all_receipts.extend(receipts.into_iter().map(|mut r| {
r.cumulative_gas_used += cumulative_gas_offset;
@@ -579,6 +618,7 @@ impl Command {
env_switches,
prior_block_hashes: accumulated_block_hashes.clone(),
},
block_access_list: merged_block_access_list,
};
// Accumulate real block hashes from this big block's env_switches for
@@ -610,6 +650,7 @@ impl Command {
total_gas_used = big_block.execution_data.payload.as_v1().gas_used,
env_switches = big_block.big_block_data.env_switches.len(),
prior_block_hashes = big_block.big_block_data.prior_block_hashes.len(),
bal_accounts = big_block.block_access_list.as_ref().map_or(0, Vec::len),
"Big block payload saved"
);
@@ -628,6 +669,85 @@ impl Command {
}
}
async fn fetch_block_access_list(
provider: &RootProvider<AnyNetwork>,
block_number: u64,
) -> eyre::Result<BlockAccessList> {
provider
.client()
.request("eth_getBlockAccessListByBlockNumber", (BlockNumberOrTag::Number(block_number),))
.await
.map_err(Into::into)
.and_then(|block_access_list: Option<BlockAccessList>| {
block_access_list.ok_or_else(|| eyre::eyre!("BAL not found for block {block_number}"))
})
}
fn merge_block_access_list(
merged: &mut BlockAccessList,
incoming: BlockAccessList,
tx_index_offset: u64,
) {
let mut account_positions = merged
.iter()
.enumerate()
.map(|(idx, account)| (account.address, idx))
.collect::<HashMap<_, _>>();
for mut account_changes in incoming {
shift_account_changes(&mut account_changes, tx_index_offset);
if let Some(&idx) = account_positions.get(&account_changes.address) {
merge_account_changes(&mut merged[idx], account_changes);
} else {
account_positions.insert(account_changes.address, merged.len());
merged.push(account_changes);
}
}
}
fn shift_account_changes(account_changes: &mut AccountChanges, tx_index_offset: u64) {
for slot_changes in &mut account_changes.storage_changes {
for change in &mut slot_changes.changes {
change.block_access_index += tx_index_offset;
}
}
for change in &mut account_changes.balance_changes {
change.block_access_index += tx_index_offset;
}
for change in &mut account_changes.nonce_changes {
change.block_access_index += tx_index_offset;
}
for change in &mut account_changes.code_changes {
change.block_access_index += tx_index_offset;
}
}
fn merge_account_changes(existing: &mut AccountChanges, incoming: AccountChanges) {
merge_slot_changes(&mut existing.storage_changes, incoming.storage_changes);
existing.storage_reads.extend(incoming.storage_reads);
existing.balance_changes.extend(incoming.balance_changes);
existing.nonce_changes.extend(incoming.nonce_changes);
existing.code_changes.extend(incoming.code_changes);
}
fn merge_slot_changes(existing: &mut Vec<SlotChanges>, incoming: Vec<SlotChanges>) {
let mut slot_positions = existing
.iter()
.enumerate()
.map(|(idx, slot_changes)| (slot_changes.slot, idx))
.collect::<HashMap<_, _>>();
for slot_changes in incoming {
if let Some(&idx) = slot_positions.get(&slot_changes.slot) {
existing[idx].changes.extend(slot_changes.changes);
} else {
slot_positions.insert(slot_changes.slot, existing.len());
existing.push(slot_changes);
}
}
}
/// Computes the block hash for an [`ExecutionData`] by converting it to a raw block
/// and hashing the header.
pub fn compute_payload_block_hash(data: &ExecutionData) -> eyre::Result<B256> {
@@ -638,3 +758,94 @@ pub fn compute_payload_block_hash(data: &ExecutionData) -> eyre::Result<B256> {
.wrap_err("failed to convert payload to block for hash computation")?;
Ok(block.header.hash_slow())
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_eips::eip7928::{BalanceChange, CodeChange, NonceChange, StorageChange};
use alloy_primitives::{Address, U256};
#[test]
fn merge_block_access_list_offsets_and_merges_accounts() {
let shared = Address::repeat_byte(0x11);
let other = Address::repeat_byte(0x22);
let mut merged = vec![AccountChanges {
address: shared,
storage_changes: vec![SlotChanges::new(
U256::from(1),
vec![StorageChange::new(0, U256::from(10))],
)],
storage_reads: vec![U256::from(3)],
balance_changes: vec![BalanceChange::new(1, U256::from(100))],
nonce_changes: vec![NonceChange::new(2, 7)],
code_changes: vec![],
}];
let incoming = vec![
AccountChanges {
address: shared,
storage_changes: vec![
SlotChanges::new(U256::from(1), vec![StorageChange::new(1, U256::from(20))]),
SlotChanges::new(U256::from(2), vec![StorageChange::new(2, U256::from(30))]),
],
storage_reads: vec![U256::from(4)],
balance_changes: vec![BalanceChange::new(0, U256::from(150))],
nonce_changes: vec![NonceChange::new(2, 8)],
code_changes: vec![CodeChange::new(1, Bytes::from_static(&[0xaa]))],
},
AccountChanges {
address: other,
storage_changes: vec![SlotChanges::new(
U256::from(9),
vec![StorageChange::new(0, U256::from(90))],
)],
storage_reads: vec![],
balance_changes: vec![],
nonce_changes: vec![],
code_changes: vec![],
},
];
merge_block_access_list(&mut merged, incoming, 3);
assert_eq!(merged.len(), 2);
let shared = &merged[0];
assert_eq!(shared.storage_reads, vec![U256::from(3), U256::from(4)]);
assert_eq!(
shared
.balance_changes
.iter()
.map(|change| change.block_access_index)
.collect::<Vec<_>>(),
vec![1, 3]
);
assert_eq!(
shared.nonce_changes.iter().map(|change| change.block_access_index).collect::<Vec<_>>(),
vec![2, 5]
);
assert_eq!(shared.code_changes[0].block_access_index, 4);
let slot_one = shared
.storage_changes
.iter()
.find(|slot_changes| slot_changes.slot == U256::from(1))
.unwrap();
assert_eq!(
slot_one.changes.iter().map(|change| change.block_access_index).collect::<Vec<_>>(),
vec![0, 4]
);
let slot_two = shared
.storage_changes
.iter()
.find(|slot_changes| slot_changes.slot == U256::from(2))
.unwrap();
assert_eq!(slot_two.changes[0].block_access_index, 5);
let other = &merged[1];
assert_eq!(other.address, Address::repeat_byte(0x22));
assert_eq!(other.storage_changes[0].changes[0].block_access_index, 3);
}
}

View File

@@ -3,7 +3,7 @@
use crate::{
authenticated_transport::AuthenticatedTransportConnect,
bench::{
generate_big_block::BigBlockPayload,
generate_big_block::{compute_payload_block_hash, BigBlockPayload},
helpers::parse_duration,
metrics_scraper::MetricsScraper,
output::{
@@ -12,12 +12,14 @@ use crate::{
},
valid_payload::{call_forkchoice_updated_with_reth, call_new_payload_with_reth},
};
use alloy_eip7928::bal::Bal;
use alloy_eips::eip7928::BlockAccessList;
use alloy_primitives::B256;
use alloy_provider::{network::AnyNetwork, Provider, RootProvider};
use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types_engine::{
CancunPayloadFields, ExecutionData, ExecutionPayloadEnvelopeV4, ExecutionPayloadSidecar,
ForkchoiceState, JwtSecret, PraguePayloadFields,
CancunPayloadFields, ExecutionData, ExecutionPayload, ExecutionPayloadEnvelopeV4,
ExecutionPayloadSidecar, ExecutionPayloadV4, ForkchoiceState, JwtSecret, PraguePayloadFields,
};
use clap::Parser;
use eyre::Context;
@@ -83,6 +85,14 @@ pub struct Command {
#[arg(long, default_value = "false", verbatim_doc_comment)]
reth_new_payload: bool,
/// Forward embedded block access lists to `reth_newPayload` when payload files contain them.
///
/// Disabled by default so the same payload set can be replayed with or without BALs.
///
/// Requires `--reth-new-payload`.
#[arg(long, default_value = "false", verbatim_doc_comment, requires = "reth_new_payload")]
bal: bool,
/// Control when `reth_newPayload` waits for in-flight persistence.
///
/// Accepts `always` (default — wait on every block), `never`, or a number N
@@ -126,6 +136,8 @@ struct LoadedPayload {
block_hash: B256,
/// Big block data containing environment switches and prior block hashes.
big_block_data: BigBlockData<ExecutionData>,
/// Optional BAL flattened into the payload file.
block_access_list: Option<BlockAccessList>,
}
impl Command {
@@ -139,6 +151,9 @@ impl Command {
}
if self.reth_new_payload {
info!("Using reth_newPayload and reth_forkchoiceUpdated endpoints");
if self.bal {
info!(target: "reth-bench", "Forwarding embedded block_access_list data");
}
}
let mut metrics_scraper = MetricsScraper::maybe_new(self.metrics_url.clone());
@@ -185,10 +200,13 @@ impl Command {
}
info!(target: "reth-bench", count = payloads.len(), "Loaded main payloads from disk");
let has_env_switches = payloads.iter().any(|p| !p.big_block_data.env_switches.is_empty());
let has_block_access_lists = payloads.iter().any(|p| {
p.block_access_list.as_ref().is_some_and(|bal: &BlockAccessList| !bal.is_empty())
});
// If any payload has env_switches but we're not using reth_newPayload, warn the user
if !self.reth_new_payload {
let has_env_switches =
payloads.iter().any(|p| !p.big_block_data.env_switches.is_empty());
if has_env_switches {
warn!(
target: "reth-bench",
@@ -196,6 +214,18 @@ impl Command {
env_switches are only supported with reth_newPayload and will be ignored."
);
}
if has_block_access_lists {
warn!(
target: "reth-bench",
"Payloads contain block_access_list data but --reth-new-payload is not set. \
BALs are only forwarded with reth_newPayload and will be ignored."
);
}
} else if has_block_access_lists && !self.bal {
info!(
target: "reth-bench",
"Payloads contain block_access_list data but --bal is not set. BALs will be ignored."
);
}
let mut parent_hash = initial_parent_hash;
@@ -205,7 +235,7 @@ impl Command {
for (i, payload) in payloads.iter().enumerate() {
let execution_data = &payload.execution_data;
let block_hash = payload.block_hash;
let mut block_hash = payload.block_hash;
let v1 = execution_data.payload.as_v1();
let gas_used = v1.gas_used;
@@ -243,10 +273,39 @@ impl Command {
.wait_for_persistence
.unwrap_or(WaitForPersistence::Never)
.rpc_value(block_number);
// Inject sidecar BAL into the inline V4 payload field when --bal is set.
// If the payload is not already V4 we upgrade it (V3→V4) so the BAL
// can be carried inline. This changes the block hash, so we recompute
// it and patch parent_hash to maintain the chain.
let mut execution_data = execution_data.clone();
if self.bal &&
let Some(bal) = &payload.block_access_list
{
let encoded_bal: alloy_primitives::Bytes =
alloy_rlp::encode(Bal::from(bal.clone())).into();
// Upgrade to V4 if necessary, then set the BAL field.
if execution_data.payload.as_v4().is_none() {
execution_data.payload = upgrade_to_v4(execution_data.payload, encoded_bal);
} else {
execution_data.payload.as_v4_mut().unwrap().block_access_list = encoded_bal;
}
// Patch parent_hash so this block chains off the (possibly
// rehashed) previous block.
execution_data.payload.as_v1_mut().parent_hash = parent_hash;
// Recompute block hash after payload modification and update
// the hash stored in the payload itself.
block_hash = compute_payload_block_hash(&execution_data)?;
execution_data.payload.as_v1_mut().block_hash = block_hash;
}
(
None,
serde_json::to_value((
RethNewPayloadInput::ExecutionData(execution_data.clone()),
RethNewPayloadInput::ExecutionData(execution_data),
wait_for_persistence,
self.no_wait_for_caches.then_some(false),
big_block_data_param,
@@ -417,26 +476,27 @@ impl Command {
.wrap_err_with(|| format!("Failed to read {:?}", path))?;
// Try BigBlockPayload first, then fall back to legacy ExecutionPayloadEnvelopeV4
let (execution_data, big_block_data) =
if let Ok(big_block) = serde_json::from_str::<BigBlockPayload>(&content) {
(big_block.execution_data, big_block.big_block_data)
} else {
let envelope: ExecutionPayloadEnvelopeV4 = serde_json::from_str(&content)
.wrap_err_with(|| format!("Failed to parse {:?}", path))?;
let execution_data = ExecutionData {
payload: envelope.envelope_inner.execution_payload.clone().into(),
sidecar: ExecutionPayloadSidecar::v4(
CancunPayloadFields {
versioned_hashes: Vec::new(),
parent_beacon_block_root: B256::ZERO,
},
PraguePayloadFields {
requests: envelope.execution_requests.clone().into(),
},
),
};
(execution_data, BigBlockData::default())
let (execution_data, big_block_data, block_access_list) = if let Ok(big_block) =
serde_json::from_str::<BigBlockPayload>(&content)
{
(big_block.execution_data, big_block.big_block_data, big_block.block_access_list)
} else {
let envelope: ExecutionPayloadEnvelopeV4 = serde_json::from_str(&content)
.wrap_err_with(|| format!("Failed to parse {:?}", path))?;
let execution_data = ExecutionData {
payload: envelope.envelope_inner.execution_payload.clone().into(),
sidecar: ExecutionPayloadSidecar::v4(
CancunPayloadFields {
versioned_hashes: Vec::new(),
parent_beacon_block_root: B256::ZERO,
},
PraguePayloadFields {
requests: envelope.execution_requests.clone().into(),
},
),
};
(execution_data, BigBlockData::default(), None)
};
let block_hash = execution_data.payload.as_v1().block_hash;
@@ -446,13 +506,48 @@ impl Command {
block_hash = %block_hash,
env_switches = big_block_data.env_switches.len(),
prior_block_hashes = big_block_data.prior_block_hashes.len(),
bal_accounts = block_access_list.as_ref().map_or(0, Vec::len),
path = %path.display(),
"Loaded payload"
);
payloads.push(LoadedPayload { index, execution_data, block_hash, big_block_data });
payloads.push(LoadedPayload {
index,
execution_data,
block_hash,
big_block_data,
block_access_list,
});
}
Ok(payloads)
}
}
/// Upgrades an [`ExecutionPayload`] to V4 by wrapping the inner V3 payload (constructing
/// default V2/V3 layers for V1 payloads if needed) and setting the provided BAL bytes.
fn upgrade_to_v4(
payload: ExecutionPayload,
block_access_list: alloy_primitives::Bytes,
) -> ExecutionPayload {
use alloy_rpc_types_engine::{ExecutionPayloadV2, ExecutionPayloadV3};
let v3 = match payload {
ExecutionPayload::V4(_) => unreachable!("caller checks as_v4().is_none()"),
ExecutionPayload::V3(v3) => v3,
ExecutionPayload::V2(v2) => {
ExecutionPayloadV3 { payload_inner: v2, blob_gas_used: 0, excess_blob_gas: 0 }
}
ExecutionPayload::V1(v1) => ExecutionPayloadV3 {
payload_inner: ExecutionPayloadV2 { payload_inner: v1, withdrawals: Vec::new() },
blob_gas_used: 0,
excess_blob_gas: 0,
},
};
ExecutionPayload::V4(ExecutionPayloadV4 {
payload_inner: v3,
block_access_list,
slot_number: 0,
})
}

View File

@@ -28,7 +28,7 @@ use alloy_consensus::{
};
use alloy_eips::{
eip1559::INITIAL_BASE_FEE, eip7685::EMPTY_REQUESTS_HASH, eip7840::BlobParams,
eip7892::BlobScheduleBlobParams,
eip7892::BlobScheduleBlobParams, eip7928::EMPTY_BLOCK_ACCESS_LIST_HASH,
};
use alloy_genesis::{ChainConfig, Genesis};
use alloy_primitives::{address, b256, Address, BlockNumber, B256, U256};
@@ -76,6 +76,18 @@ pub fn make_genesis_header(genesis: &Genesis, hardforks: &ChainHardforks) -> Hea
.active_at_timestamp(genesis.timestamp)
.then_some(EMPTY_REQUESTS_HASH);
// If Amsterdam is activated at genesis we set block access list hash to an empty bal hash
let block_access_list_hash = hardforks
.fork(EthereumHardfork::Amsterdam)
.active_at_timestamp(genesis.timestamp)
.then_some(EMPTY_BLOCK_ACCESS_LIST_HASH);
// If Amsterdam is activated at genesis we set slot number to 0
let slot_number = hardforks
.fork(EthereumHardfork::Amsterdam)
.active_at_timestamp(genesis.timestamp)
.then_some(0);
Header {
number: genesis.number.unwrap_or_default(),
parent_hash: genesis.parent_hash.unwrap_or_default(),
@@ -93,6 +105,8 @@ pub fn make_genesis_header(genesis: &Genesis, hardforks: &ChainHardforks) -> Hea
blob_gas_used,
excess_blob_gas,
requests_hash,
block_access_list_hash,
slot_number,
..Default::default()
}
}
@@ -297,6 +311,7 @@ pub fn create_chain_config(
cancun_time: timestamp(EthereumHardfork::Cancun),
prague_time: timestamp(EthereumHardfork::Prague),
osaka_time: timestamp(EthereumHardfork::Osaka),
amsterdam_time: timestamp(EthereumHardfork::Amsterdam),
bpo1_time: timestamp(EthereumHardfork::Bpo1),
bpo2_time: timestamp(EthereumHardfork::Bpo2),
bpo3_time: timestamp(EthereumHardfork::Bpo3),
@@ -880,6 +895,7 @@ impl From<Genesis> for ChainSpec {
(EthereumHardfork::Bpo3.boxed(), genesis.config.bpo3_time),
(EthereumHardfork::Bpo4.boxed(), genesis.config.bpo4_time),
(EthereumHardfork::Bpo5.boxed(), genesis.config.bpo5_time),
(EthereumHardfork::Amsterdam.boxed(), genesis.config.amsterdam_time),
];
let mut time_hardforks = time_hardfork_opts
@@ -1186,6 +1202,19 @@ impl ChainSpecBuilder {
self
}
/// Enable Amsterdam at genesis.
pub fn amsterdam_activated(mut self) -> Self {
self = self.osaka_activated();
self.hardforks.insert(EthereumHardfork::Amsterdam, ForkCondition::Timestamp(0));
self
}
/// Enable Amsterdam at the given timestamp.
pub fn with_amsterdam_at(mut self, timestamp: u64) -> Self {
self.hardforks.insert(EthereumHardfork::Amsterdam, ForkCondition::Timestamp(timestamp));
self
}
/// Build the resulting [`ChainSpec`].
///
/// # Panics

View File

@@ -248,6 +248,7 @@ fn selection_to_prune_mode(
ComponentSelection::Distance(d) => {
Some(PruneMode::Distance(min_distance.map_or(d, |min| d.max(min))))
}
ComponentSelection::Since(block) => Some(PruneMode::Before(block)),
ComponentSelection::None => Some(min_distance.map_or(PruneMode::Full, PruneMode::Distance)),
}
}
@@ -453,6 +454,36 @@ mod tests {
assert_eq!(config.prune.segments.storage_history, Some(PruneMode::Distance(10_064)));
}
#[test]
fn selections_since_maps_to_before_prune_mode() {
let mut selections = BTreeMap::new();
selections.insert(SnapshotComponentType::State, ComponentSelection::All);
selections.insert(SnapshotComponentType::Headers, ComponentSelection::All);
selections
.insert(SnapshotComponentType::Transactions, ComponentSelection::Since(15_537_394));
selections.insert(SnapshotComponentType::Receipts, ComponentSelection::Since(15_537_394));
selections.insert(
SnapshotComponentType::AccountChangesets,
ComponentSelection::Since(15_537_394),
);
selections.insert(
SnapshotComponentType::StorageChangesets,
ComponentSelection::Since(15_537_394),
);
let config = config_for_selections(
&selections,
&empty_manifest(),
None,
None::<&reth_chainspec::ChainSpec>,
);
assert_eq!(config.prune.segments.bodies_history, Some(PruneMode::Before(15_537_394)));
assert_eq!(config.prune.segments.receipts, Some(PruneMode::Before(15_537_394)));
assert_eq!(config.prune.segments.account_history, Some(PruneMode::Before(15_537_394)));
assert_eq!(config.prune.segments.storage_history, Some(PruneMode::Before(15_537_394)));
}
#[test]
fn full_preset_matches_default_full_prune_config() {
let mut selections = BTreeMap::new();

View File

@@ -119,6 +119,9 @@ pub enum ComponentSelection {
/// Download only the most recent chunks covering at least `distance` blocks.
/// Maps to `PruneMode::Distance(distance)` in the generated config.
Distance(u64),
/// Download chunks starting at the specified block number.
/// Maps to `PruneMode::Before(block)` in the generated config.
Since(u64),
/// Don't download this component at all.
/// Maps to `PruneMode::Full` for tx-based segments, or a minimal distance for others.
None,
@@ -129,6 +132,7 @@ impl std::fmt::Display for ComponentSelection {
match self {
Self::All => write!(f, "All"),
Self::Distance(d) => write!(f, "Last {d} blocks"),
Self::Since(block) => write!(f, "Since block {block}"),
Self::None => write!(f, "None"),
}
}
@@ -936,6 +940,7 @@ mod tests {
fn component_selection_display() {
assert_eq!(ComponentSelection::All.to_string(), "All");
assert_eq!(ComponentSelection::Distance(10_064).to_string(), "Last 10064 blocks");
assert_eq!(ComponentSelection::Since(15_537_394).to_string(), "Since block 15537394");
assert_eq!(ComponentSelection::None.to_string(), "None");
}

View File

@@ -5,7 +5,7 @@ mod tui;
use crate::common::EnvironmentArgs;
use blake3::Hasher;
use clap::Parser;
use clap::{builder::RangedU64ValueParser, Parser};
use config_gen::{config_for_selections, write_config};
use eyre::{Result, WrapErr};
use futures::stream::{self, StreamExt};
@@ -48,6 +48,7 @@ const RETH_SNAPSHOTS_API_URL: &str = "https://snapshots.reth.rs/api/snapshots";
const EXTENSION_TAR_LZ4: &str = ".tar.lz4";
const EXTENSION_TAR_ZSTD: &str = ".tar.zst";
const DOWNLOAD_CACHE_DIR: &str = ".download-cache";
const STATIC_FILES_PREFIX: &str = "static_files/";
/// Maximum number of concurrent archive downloads.
const MAX_CONCURRENT_DOWNLOADS: usize = 8;
@@ -59,6 +60,7 @@ pub(crate) enum SelectionPreset {
Archive,
}
#[derive(Debug)]
struct ResolvedComponents {
selections: BTreeMap<SnapshotComponentType, ComponentSelection>,
preset: Option<SelectionPreset>,
@@ -218,18 +220,42 @@ pub struct DownloadCommand<C: ChainSpecParser> {
#[arg(long, value_name = "PATH", conflicts_with_all = ["url", "manifest_url"])]
manifest_path: Option<PathBuf>,
/// Include transaction static files.
#[arg(long, conflicts_with_all = ["minimal", "full", "archive"])]
/// Include all transaction static files.
#[arg(long, conflicts_with_all = ["with_txs_since", "with_txs_distance", "minimal", "full", "archive"])]
with_txs: bool,
/// Include receipt static files.
#[arg(long, conflicts_with_all = ["minimal", "full", "archive"])]
/// Include transaction static files starting at the specified block.
#[arg(long, value_name = "BLOCK_NUMBER", conflicts_with_all = ["with_txs", "with_txs_distance", "minimal", "full", "archive"])]
with_txs_since: Option<u64>,
/// Include transaction static files covering the last N blocks.
#[arg(long, value_name = "BLOCKS", value_parser = RangedU64ValueParser::<u64>::new().range(1..), conflicts_with_all = ["with_txs", "with_txs_since", "minimal", "full", "archive"])]
with_txs_distance: Option<u64>,
/// Include all receipt static files.
#[arg(long, conflicts_with_all = ["with_receipts_since", "with_receipts_distance", "minimal", "full", "archive"])]
with_receipts: bool,
/// Include account and storage history static files.
#[arg(long, alias = "with-changesets", conflicts_with_all = ["minimal", "full", "archive"])]
/// Include receipt static files starting at the specified block.
#[arg(long, value_name = "BLOCK_NUMBER", conflicts_with_all = ["with_receipts", "with_receipts_distance", "minimal", "full", "archive"])]
with_receipts_since: Option<u64>,
/// Include receipt static files covering the last N blocks.
#[arg(long, value_name = "BLOCKS", value_parser = RangedU64ValueParser::<u64>::new().range(1..), conflicts_with_all = ["with_receipts", "with_receipts_since", "minimal", "full", "archive"])]
with_receipts_distance: Option<u64>,
/// Include all account and storage history static files.
#[arg(long, alias = "with-changesets", conflicts_with_all = ["with_state_history_since", "with_state_history_distance", "minimal", "full", "archive"])]
with_state_history: bool,
/// Include account and storage history static files starting at the specified block.
#[arg(long, value_name = "BLOCK_NUMBER", conflicts_with_all = ["with_state_history", "with_state_history_distance", "minimal", "full", "archive"])]
with_state_history_since: Option<u64>,
/// Include account and storage history static files covering the last N blocks.
#[arg(long, value_name = "BLOCKS", value_parser = RangedU64ValueParser::<u64>::new().range(1..), conflicts_with_all = ["with_state_history", "with_state_history_since", "minimal", "full", "archive"])]
with_state_history_distance: Option<u64>,
/// Include transaction sender static files. Requires `--with-txs`.
#[arg(long, requires = "with_txs", conflicts_with_all = ["minimal", "full", "archive"])]
with_senders: bool,
@@ -239,15 +265,15 @@ pub struct DownloadCommand<C: ChainSpecParser> {
with_rocksdb: bool,
/// Download all available components (archive node, no pruning).
#[arg(long, alias = "all", conflicts_with_all = ["with_txs", "with_receipts", "with_state_history", "with_senders", "with_rocksdb", "minimal", "full"])]
#[arg(long, alias = "all", conflicts_with_all = ["with_txs", "with_txs_since", "with_txs_distance", "with_receipts", "with_receipts_since", "with_receipts_distance", "with_state_history", "with_state_history_since", "with_state_history_distance", "with_senders", "with_rocksdb", "minimal", "full"])]
archive: bool,
/// Download the minimal component set (same default as --non-interactive).
#[arg(long, conflicts_with_all = ["with_txs", "with_receipts", "with_state_history", "with_senders", "with_rocksdb", "archive", "full"])]
#[arg(long, conflicts_with_all = ["with_txs", "with_txs_since", "with_txs_distance", "with_receipts", "with_receipts_since", "with_receipts_distance", "with_state_history", "with_state_history_since", "with_state_history_distance", "with_senders", "with_rocksdb", "archive", "full"])]
minimal: bool,
/// Download the full node component set (matches default full prune settings).
#[arg(long, conflicts_with_all = ["with_txs", "with_receipts", "with_state_history", "with_senders", "with_rocksdb", "archive", "minimal"])]
#[arg(long, conflicts_with_all = ["with_txs", "with_txs_since", "with_txs_distance", "with_receipts", "with_receipts_since", "with_receipts_distance", "with_state_history", "with_state_history_since", "with_state_history_distance", "with_senders", "with_rocksdb", "archive", "minimal"])]
full: bool,
/// Skip optional RocksDB indices even when archive components are selected.
@@ -299,6 +325,11 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
return Ok(());
}
// Resolve custom static files directory (None when using the default).
let default_sf = data_dir.data_dir().join("static_files");
let custom_sf = data_dir.static_files();
let static_files_dir = if custom_sf != default_sf { Some(custom_sf) } else { None };
// Legacy single-URL mode: download one archive and extract it
if let Some(ref url) = self.url {
info!(target: "reth::cli",
@@ -310,6 +341,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
stream_and_extract(
url,
data_dir.data_dir(),
static_files_dir,
None,
self.resumable,
cancel_token.clone(),
@@ -348,6 +380,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
let distance = match sel {
ComponentSelection::All => None,
ComponentSelection::Distance(d) => Some(*d),
ComponentSelection::Since(block) => Some(manifest.block - block + 1),
ComponentSelection::None => continue,
};
let descriptors = manifest.archive_descriptors_for_distance(*ty, distance);
@@ -398,11 +431,15 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
.map(|(ty, sel)| match sel {
ComponentSelection::All => manifest.size_for_distance(*ty, None),
ComponentSelection::Distance(d) => manifest.size_for_distance(*ty, Some(*d)),
ComponentSelection::Since(block) => {
manifest.size_for_distance(*ty, Some(manifest.block - block + 1))
}
ComponentSelection::None => 0,
})
.sum();
let startup_summary = summarize_download_startup(&all_downloads, target_dir)?;
let startup_summary =
summarize_download_startup(&all_downloads, target_dir, static_files_dir.as_deref())?;
info!(target: "reth::cli",
reusable = startup_summary.reusable,
needs_download = startup_summary.needs_download,
@@ -419,12 +456,14 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
let progress_handle = spawn_progress_display(Arc::clone(&shared));
let target = target_dir.to_path_buf();
let sf_dir = static_files_dir;
let cache_dir = download_cache_dir;
let resumable = self.resumable;
let download_concurrency = self.download_concurrency.max(1);
let results: Vec<Result<()>> = stream::iter(all_downloads)
.map(|planned| {
let dir = target.clone();
let sf = sf_dir.clone();
let cache = cache_dir.clone();
let sp = Arc::clone(&shared);
let ct = cancel_token.clone();
@@ -432,6 +471,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
process_modular_archive(
planned,
&dir,
sf.as_deref(),
cache.as_deref(),
Some(sp),
resumable,
@@ -486,7 +526,8 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
tx.commit()?;
}
info!(target: "reth::cli", "Snapshot download complete. Run `reth node` to start syncing.");
let start_command = startup_node_command::<C>(self.env.chain.as_ref());
info!(target: "reth::cli", "Snapshot download complete. Run `{}` to start syncing.", start_command);
Ok(())
}
@@ -526,13 +567,41 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
}
let has_explicit_flags = self.with_txs ||
self.with_txs_since.is_some() ||
self.with_txs_distance.is_some() ||
self.with_receipts ||
self.with_receipts_since.is_some() ||
self.with_receipts_distance.is_some() ||
self.with_state_history ||
self.with_state_history_since.is_some() ||
self.with_state_history_distance.is_some() ||
self.with_senders ||
self.with_rocksdb;
if has_explicit_flags {
let mut selections = BTreeMap::new();
let tx_selection = explicit_component_selection(
"--with-txs-since",
self.with_txs,
self.with_txs_since,
self.with_txs_distance,
manifest.block,
)?;
let receipt_selection = explicit_component_selection(
"--with-receipts-since",
self.with_receipts,
self.with_receipts_since,
self.with_receipts_distance,
manifest.block,
)?;
let state_history_selection = explicit_component_selection(
"--with-state-history-since",
self.with_state_history,
self.with_state_history_since,
self.with_state_history_distance,
manifest.block,
)?;
// Required components always All
if available(SnapshotComponentType::State) {
selections.insert(SnapshotComponentType::State, ComponentSelection::All);
@@ -540,20 +609,22 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
if available(SnapshotComponentType::Headers) {
selections.insert(SnapshotComponentType::Headers, ComponentSelection::All);
}
if self.with_txs && available(SnapshotComponentType::Transactions) {
selections.insert(SnapshotComponentType::Transactions, ComponentSelection::All);
if let Some(selection) = tx_selection &&
available(SnapshotComponentType::Transactions)
{
selections.insert(SnapshotComponentType::Transactions, selection);
}
if self.with_receipts && available(SnapshotComponentType::Receipts) {
selections.insert(SnapshotComponentType::Receipts, ComponentSelection::All);
if let Some(selection) = receipt_selection &&
available(SnapshotComponentType::Receipts)
{
selections.insert(SnapshotComponentType::Receipts, selection);
}
if self.with_state_history {
if let Some(selection) = state_history_selection {
if available(SnapshotComponentType::AccountChangesets) {
selections
.insert(SnapshotComponentType::AccountChangesets, ComponentSelection::All);
selections.insert(SnapshotComponentType::AccountChangesets, selection);
}
if available(SnapshotComponentType::StorageChangesets) {
selections
.insert(SnapshotComponentType::StorageChangesets, ComponentSelection::All);
selections.insert(SnapshotComponentType::StorageChangesets, selection);
}
}
if self.with_senders && available(SnapshotComponentType::TransactionSenders) {
@@ -641,9 +712,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
.ethereum_fork_activation(EthereumHardfork::Paris)
.block_number()
{
Some(paris) if snapshot_block >= paris => {
ComponentSelection::Distance(snapshot_block - paris + 1)
}
Some(paris) if snapshot_block >= paris => ComponentSelection::Since(paris),
Some(_) => ComponentSelection::None,
None => ComponentSelection::All,
}
@@ -690,7 +759,7 @@ fn selection_from_prune_mode(mode: Option<PruneMode>, snapshot_block: u64) -> Co
Some(PruneMode::Distance(d)) => ComponentSelection::Distance(d),
Some(PruneMode::Before(block)) => {
if snapshot_block >= block {
ComponentSelection::Distance(snapshot_block - block + 1)
ComponentSelection::Since(block)
} else {
ComponentSelection::None
}
@@ -698,6 +767,27 @@ fn selection_from_prune_mode(mode: Option<PruneMode>, snapshot_block: u64) -> Co
}
}
fn explicit_component_selection(
since_flag_name: &str,
all: bool,
since: Option<u64>,
distance: Option<u64>,
snapshot_block: u64,
) -> Result<Option<ComponentSelection>> {
if all {
return Ok(Some(ComponentSelection::All));
}
if let Some(since) = since {
if since > snapshot_block {
eyre::bail!("{since_flag_name} {since} is beyond the snapshot block {snapshot_block}");
}
return Ok(Some(ComponentSelection::Since(since)));
}
Ok(distance.map(ComponentSelection::Distance))
}
/// If all data components (txs, receipts, changesets) are `All`, automatically
/// include hidden archive-only components when available in the manifest.
fn inject_archive_only_components(
@@ -736,6 +826,66 @@ fn should_reset_index_stage_checkpoints(
!matches!(selections.get(&SnapshotComponentType::RocksdbIndices), Some(ComponentSelection::All))
}
fn startup_node_command<C>(chain_spec: &C::ChainSpec) -> String
where
C: ChainSpecParser,
C::ChainSpec: EthChainSpec,
{
startup_node_command_for_binary::<C>(&current_binary_name(), chain_spec)
}
fn startup_node_command_for_binary<C>(binary_name: &str, chain_spec: &C::ChainSpec) -> String
where
C: ChainSpecParser,
C::ChainSpec: EthChainSpec,
{
let mut command = format!("{binary_name} node");
if let Some(chain_arg) = startup_chain_arg::<C>(chain_spec) {
command.push_str(" --chain ");
command.push_str(&chain_arg);
}
command
}
fn current_binary_name() -> String {
std::env::args_os()
.next()
.map(PathBuf::from)
.and_then(|path| path.file_stem().map(|name| name.to_owned()))
.and_then(|name| name.into_string().ok())
.filter(|name| !name.is_empty())
.unwrap_or_else(|| "reth".to_string())
}
fn startup_chain_arg<C>(chain_spec: &C::ChainSpec) -> Option<String>
where
C: ChainSpecParser,
C::ChainSpec: EthChainSpec,
{
let current_chain = chain_spec.chain();
let current_genesis_hash = chain_spec.genesis_hash();
let default_chain = C::default_value().and_then(|chain_name| C::parse(chain_name).ok());
if default_chain.as_ref().is_some_and(|default_chain| {
default_chain.chain() == current_chain &&
default_chain.genesis_hash() == current_genesis_hash
}) {
return None;
}
C::SUPPORTED_CHAINS
.iter()
.find_map(|chain_name| {
let parsed_chain = C::parse(chain_name).ok()?;
(parsed_chain.chain() == current_chain &&
parsed_chain.genesis_hash() == current_genesis_hash)
.then(|| (*chain_name).to_string())
})
.or_else(|| Some("<chain-or-chainspec>".to_string()))
}
impl<C: ChainSpecParser> DownloadCommand<C> {
/// Returns the underlying chain being used to run this command
pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
@@ -775,11 +925,12 @@ struct DownloadStartupSummary {
fn summarize_download_startup(
all_downloads: &[PlannedArchive],
target_dir: &Path,
static_files_dir: Option<&Path>,
) -> Result<DownloadStartupSummary> {
let mut summary = DownloadStartupSummary::default();
for planned in all_downloads {
if verify_output_files(target_dir, &planned.archive.output_files)? {
if verify_output_files(target_dir, static_files_dir, &planned.archive.output_files)? {
summary.reusable += 1;
} else {
summary.needs_download += 1;
@@ -1029,12 +1180,73 @@ impl CompressionFormat {
}
}
/// Resolves the filesystem path for an output file, remapping `static_files/`
/// entries to the custom static files directory when one is configured.
fn resolve_output_path(
target_dir: &Path,
relative_path: &str,
static_files_dir: Option<&Path>,
) -> PathBuf {
if let Some(sf_dir) = static_files_dir &&
let Some(rest) = relative_path.strip_prefix(STATIC_FILES_PREFIX)
{
return sf_dir.join(rest);
}
target_dir.join(relative_path)
}
/// Unpacks a tar archive entry-by-entry, remapping `static_files/` paths to
/// the custom static files directory when one is configured.
fn unpack_archive_with_remap<R: Read>(
archive: &mut Archive<R>,
target_dir: &Path,
static_files_dir: Option<&Path>,
) -> Result<()> {
// Fast path: no remapping needed, use standard unpack.
let Some(sf_dir) = static_files_dir else {
archive.unpack(target_dir)?;
return Ok(());
};
fs::create_dir_all(sf_dir)?;
for entry in archive.entries()? {
let mut entry = entry?;
let path = entry.path()?.into_owned();
let path_str = path.to_string_lossy();
if let Some(rest) = path_str.strip_prefix(STATIC_FILES_PREFIX) {
if rest.is_empty() {
// Directory entry for `static_files/` itself — skip, we
// already created the custom directory above.
continue;
}
let dest = sf_dir.join(rest);
if let Some(parent) = dest.parent() {
fs::create_dir_all(parent)?;
}
entry.unpack(&dest)?;
} else if path_str == "static_files" {
// Bare directory entry without trailing slash.
continue;
} else {
let dest = target_dir.join(&path);
if let Some(parent) = dest.parent() {
fs::create_dir_all(parent)?;
}
entry.unpack(&dest)?;
}
}
Ok(())
}
/// Extracts a compressed tar archive to the target directory with progress tracking.
fn extract_archive<R: Read>(
reader: R,
total_size: u64,
format: CompressionFormat,
target_dir: &Path,
static_files_dir: Option<&Path>,
cancel_token: CancellationToken,
) -> Result<()> {
let progress_reader = ProgressReader::new(reader, total_size, cancel_token);
@@ -1042,11 +1254,11 @@ fn extract_archive<R: Read>(
match format {
CompressionFormat::Lz4 => {
let decoder = Decoder::new(progress_reader)?;
Archive::new(decoder).unpack(target_dir)?;
unpack_archive_with_remap(&mut Archive::new(decoder), target_dir, static_files_dir)?;
}
CompressionFormat::Zstd => {
let decoder = ZstdDecoder::new(progress_reader)?;
Archive::new(decoder).unpack(target_dir)?;
unpack_archive_with_remap(&mut Archive::new(decoder), target_dir, static_files_dir)?;
}
}
@@ -1059,20 +1271,34 @@ fn extract_archive_raw<R: Read>(
reader: R,
format: CompressionFormat,
target_dir: &Path,
static_files_dir: Option<&Path>,
) -> Result<()> {
match format {
CompressionFormat::Lz4 => {
Archive::new(Decoder::new(reader)?).unpack(target_dir)?;
unpack_archive_with_remap(
&mut Archive::new(Decoder::new(reader)?),
target_dir,
static_files_dir,
)?;
}
CompressionFormat::Zstd => {
Archive::new(ZstdDecoder::new(reader)?).unpack(target_dir)?;
unpack_archive_with_remap(
&mut Archive::new(ZstdDecoder::new(reader)?),
target_dir,
static_files_dir,
)?;
}
}
Ok(())
}
/// Extracts a snapshot from a local file.
fn extract_from_file(path: &Path, format: CompressionFormat, target_dir: &Path) -> Result<()> {
fn extract_from_file(
path: &Path,
format: CompressionFormat,
target_dir: &Path,
static_files_dir: Option<&Path>,
) -> Result<()> {
let file = std::fs::File::open(path)?;
let total_size = file.metadata()?.len();
info!(target: "reth::cli",
@@ -1081,7 +1307,14 @@ fn extract_from_file(path: &Path, format: CompressionFormat, target_dir: &Path)
"Extracting local archive"
);
let start = Instant::now();
extract_archive(file, total_size, format, target_dir, CancellationToken::new())?;
extract_archive(
file,
total_size,
format,
target_dir,
static_files_dir,
CancellationToken::new(),
)?;
info!(target: "reth::cli",
file = %path.display(),
elapsed = %DownloadProgress::format_duration(start.elapsed()),
@@ -1338,6 +1571,7 @@ fn streaming_download_and_extract(
url: &str,
format: CompressionFormat,
target_dir: &Path,
static_files_dir: Option<&Path>,
shared: Option<&Arc<SharedProgress>>,
cancel_token: CancellationToken,
) -> Result<()> {
@@ -1387,10 +1621,17 @@ fn streaming_download_and_extract(
let result = if let Some(sp) = shared {
let reader = SharedProgressReader { inner: response, progress: Arc::clone(sp) };
extract_archive_raw(reader, format, target_dir)
extract_archive_raw(reader, format, target_dir, static_files_dir)
} else {
let total_size = response.content_length().unwrap_or(0);
extract_archive(response, total_size, format, target_dir, cancel_token.clone())
extract_archive(
response,
total_size,
format,
target_dir,
static_files_dir,
cancel_token.clone(),
)
};
match result {
@@ -1423,6 +1664,7 @@ fn download_and_extract(
url: &str,
format: CompressionFormat,
target_dir: &Path,
static_files_dir: Option<&Path>,
shared: Option<&Arc<SharedProgress>>,
cancel_token: CancellationToken,
) -> Result<()> {
@@ -1444,9 +1686,9 @@ fn download_and_extract(
if quiet {
// Skip progress tracking for extraction in parallel mode
extract_archive_raw(file, format, target_dir)?;
extract_archive_raw(file, format, target_dir, static_files_dir)?;
} else {
extract_archive(file, total_size, format, target_dir, cancel_token)?;
extract_archive(file, total_size, format, target_dir, static_files_dir, cancel_token)?;
info!(target: "reth::cli",
file = %file_name,
"Extraction complete"
@@ -1470,6 +1712,7 @@ fn download_and_extract(
fn blocking_download_and_extract(
url: &str,
target_dir: &Path,
static_files_dir: Option<&Path>,
shared: Option<Arc<SharedProgress>>,
resumable: bool,
cancel_token: CancellationToken,
@@ -1482,7 +1725,7 @@ fn blocking_download_and_extract(
let file_path = parsed_url
.to_file_path()
.map_err(|_| eyre::eyre!("Invalid file:// URL path: {}", url))?;
let result = extract_from_file(&file_path, format, target_dir);
let result = extract_from_file(&file_path, format, target_dir, static_files_dir);
if result.is_ok() &&
let Some(sp) = shared
{
@@ -1490,10 +1733,23 @@ fn blocking_download_and_extract(
}
result
} else if resumable {
download_and_extract(url, format, target_dir, shared.as_ref(), cancel_token)
download_and_extract(
url,
format,
target_dir,
static_files_dir,
shared.as_ref(),
cancel_token,
)
} else {
let result =
streaming_download_and_extract(url, format, target_dir, shared.as_ref(), cancel_token);
let result = streaming_download_and_extract(
url,
format,
target_dir,
static_files_dir,
shared.as_ref(),
cancel_token,
);
if result.is_ok() &&
let Some(sp) = shared
{
@@ -1511,6 +1767,7 @@ fn blocking_download_and_extract(
async fn stream_and_extract(
url: &str,
target_dir: &Path,
static_files_dir: Option<PathBuf>,
shared: Option<Arc<SharedProgress>>,
resumable: bool,
cancel_token: CancellationToken,
@@ -1518,7 +1775,14 @@ async fn stream_and_extract(
let target_dir = target_dir.to_path_buf();
let url = url.to_string();
task::spawn_blocking(move || {
blocking_download_and_extract(&url, &target_dir, shared, resumable, cancel_token)
blocking_download_and_extract(
&url,
&target_dir,
static_files_dir.as_deref(),
shared,
resumable,
cancel_token,
)
})
.await??;
@@ -1528,18 +1792,21 @@ async fn stream_and_extract(
async fn process_modular_archive(
planned: PlannedArchive,
target_dir: &Path,
static_files_dir: Option<&Path>,
cache_dir: Option<&Path>,
shared: Option<Arc<SharedProgress>>,
resumable: bool,
cancel_token: CancellationToken,
) -> Result<()> {
let target_dir = target_dir.to_path_buf();
let static_files_dir = static_files_dir.map(Path::to_path_buf);
let cache_dir = cache_dir.map(Path::to_path_buf);
task::spawn_blocking(move || {
blocking_process_modular_archive(
&planned,
&target_dir,
static_files_dir.as_deref(),
cache_dir.as_deref(),
shared,
resumable,
@@ -1554,13 +1821,14 @@ async fn process_modular_archive(
fn blocking_process_modular_archive(
planned: &PlannedArchive,
target_dir: &Path,
static_files_dir: Option<&Path>,
cache_dir: Option<&Path>,
shared: Option<Arc<SharedProgress>>,
resumable: bool,
cancel_token: CancellationToken,
) -> Result<()> {
let archive = &planned.archive;
if verify_output_files(target_dir, &archive.output_files)? {
if verify_output_files(target_dir, static_files_dir, &archive.output_files)? {
if let Some(sp) = &shared {
sp.add(archive.size);
sp.archive_done();
@@ -1572,7 +1840,7 @@ fn blocking_process_modular_archive(
let format = CompressionFormat::from_url(&archive.file_name)?;
let mut last_error: Option<eyre::Error> = None;
for attempt in 1..=MAX_DOWNLOAD_RETRIES {
cleanup_output_files(target_dir, &archive.output_files);
cleanup_output_files(target_dir, static_files_dir, &archive.output_files);
if resumable {
let cache_dir = cache_dir.ok_or_else(|| eyre::eyre!("Missing cache directory"))?;
@@ -1582,7 +1850,7 @@ fn blocking_process_modular_archive(
resumable_download(&archive.url, cache_dir, shared.as_ref(), cancel_token.clone())
.and_then(|(downloaded_path, _)| {
let file = fs::open(&downloaded_path)?;
extract_archive_raw(file, format, target_dir)
extract_archive_raw(file, format, target_dir, static_files_dir)
});
let _ = fs::remove_file(&archive_path);
let _ = fs::remove_file(&part_path);
@@ -1607,12 +1875,13 @@ fn blocking_process_modular_archive(
&archive.url,
format,
target_dir,
static_files_dir,
shared.as_ref(),
cancel_token.clone(),
)?;
}
if verify_output_files(target_dir, &archive.output_files)? {
if verify_output_files(target_dir, static_files_dir, &archive.output_files)? {
if let Some(sp) = &shared {
sp.archive_done();
}
@@ -1636,13 +1905,17 @@ fn blocking_process_modular_archive(
)
}
fn verify_output_files(target_dir: &Path, output_files: &[OutputFileChecksum]) -> Result<bool> {
fn verify_output_files(
target_dir: &Path,
static_files_dir: Option<&Path>,
output_files: &[OutputFileChecksum],
) -> Result<bool> {
if output_files.is_empty() {
return Ok(false);
}
for expected in output_files {
let output_path = target_dir.join(&expected.path);
let output_path = resolve_output_path(target_dir, &expected.path, static_files_dir);
let meta = match fs::metadata(&output_path) {
Ok(meta) => meta,
Err(_) => return Ok(false),
@@ -1660,9 +1933,13 @@ fn verify_output_files(target_dir: &Path, output_files: &[OutputFileChecksum]) -
Ok(true)
}
fn cleanup_output_files(target_dir: &Path, output_files: &[OutputFileChecksum]) {
fn cleanup_output_files(
target_dir: &Path,
static_files_dir: Option<&Path>,
output_files: &[OutputFileChecksum],
) {
for output in output_files {
let _ = fs::remove_file(target_dir.join(&output.path));
let _ = fs::remove_file(resolve_output_path(target_dir, &output.path, static_files_dir));
}
}
@@ -1903,7 +2180,8 @@ fn resolve_manifest_base_url(manifest: &SnapshotManifest, source: &str) -> Resul
mod tests {
use super::*;
use clap::{Args, Parser};
use manifest::{ComponentManifest, SingleArchive};
use manifest::{ChunkedArchive, ComponentManifest, SingleArchive};
use reth_chainspec::{HOLESKY, MAINNET};
use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
use tempfile::tempdir;
@@ -1944,6 +2222,46 @@ mod tests {
}
}
fn manifest_with_modular_components(block: u64) -> SnapshotManifest {
let mut components = BTreeMap::new();
for ty in [
SnapshotComponentType::State,
SnapshotComponentType::Headers,
SnapshotComponentType::Transactions,
SnapshotComponentType::Receipts,
SnapshotComponentType::AccountChangesets,
SnapshotComponentType::StorageChangesets,
] {
let component = if ty.is_chunked() {
ComponentManifest::Chunked(ChunkedArchive {
blocks_per_file: 1_000_000,
total_blocks: block + 1,
chunk_sizes: vec![1],
chunk_output_files: vec![vec![]],
})
} else {
ComponentManifest::Single(SingleArchive {
file: format!("{}.tar.zst", ty.key()),
size: 1,
blake3: None,
output_files: vec![],
})
};
components.insert(ty.key().to_string(), component);
}
SnapshotManifest {
block,
chain_id: 1,
storage_version: 2,
timestamp: 0,
base_url: Some("https://example.com".to_string()),
reth_version: None,
components,
}
}
#[test]
fn test_download_defaults_builder() {
let defaults = DownloadDefaults::default()
@@ -2029,6 +2347,57 @@ mod tests {
assert!(!args.resumable);
}
#[test]
fn resolve_components_supports_since_and_distance_flags() {
let manifest = manifest_with_modular_components(20_000_000);
let args = CommandParser::<DownloadCommand<EthereumChainSpecParser>>::parse_from([
"reth",
"--with-txs-since",
"15537394",
"--with-receipts-distance",
"10064",
"--with-state-history-since",
"15537394",
])
.args;
let resolved = args.resolve_components(&manifest).unwrap();
assert_eq!(resolved.preset, None);
assert_eq!(
resolved.selections.get(&SnapshotComponentType::Transactions),
Some(&ComponentSelection::Since(15_537_394))
);
assert_eq!(
resolved.selections.get(&SnapshotComponentType::Receipts),
Some(&ComponentSelection::Distance(10_064))
);
assert_eq!(
resolved.selections.get(&SnapshotComponentType::AccountChangesets),
Some(&ComponentSelection::Since(15_537_394))
);
assert_eq!(
resolved.selections.get(&SnapshotComponentType::StorageChangesets),
Some(&ComponentSelection::Since(15_537_394))
);
}
#[test]
fn resolve_components_rejects_since_after_snapshot() {
let manifest = manifest_with_modular_components(20_000_000);
let args = CommandParser::<DownloadCommand<EthereumChainSpecParser>>::parse_from([
"reth",
"--with-txs-since",
"20000001",
])
.args;
let err = args.resolve_components(&manifest).unwrap_err();
assert!(err
.to_string()
.contains("--with-txs-since 20000001 is beyond the snapshot block 20000000"));
}
#[test]
fn test_compression_format_detection() {
assert!(matches!(
@@ -2151,7 +2520,7 @@ mod tests {
},
];
let summary = summarize_download_startup(&planned, target_dir).unwrap();
let summary = summarize_download_startup(&planned, target_dir, None).unwrap();
assert_eq!(summary.reusable, 1);
assert_eq!(summary.needs_download, 2);
}
@@ -2217,4 +2586,28 @@ mod tests {
assert_eq!(planned[1].ty, SnapshotComponentType::RocksdbIndices);
assert_eq!(planned[2].ty, SnapshotComponentType::Transactions);
}
#[test]
fn startup_node_command_omits_default_chain_arg() {
let command =
startup_node_command_for_binary::<EthereumChainSpecParser>("reth", MAINNET.as_ref());
assert_eq!(command, "reth node");
}
#[test]
fn startup_node_command_includes_non_default_chain_arg() {
let command =
startup_node_command_for_binary::<EthereumChainSpecParser>("reth", HOLESKY.as_ref());
assert_eq!(command, "reth node --chain holesky");
}
#[test]
fn startup_node_command_uses_running_binary_name() {
let command =
startup_node_command_for_binary::<EthereumChainSpecParser>("tempo", HOLESKY.as_ref());
assert_eq!(command, "tempo node --chain holesky");
}
}

View File

@@ -262,6 +262,7 @@ impl SelectorApp {
ComponentSelection::None => return 0,
ComponentSelection::All => None,
ComponentSelection::Distance(d) => Some(d),
ComponentSelection::Since(block) => Some(self.manifest.block - block + 1),
};
self.groups[group_idx]
.types
@@ -344,6 +345,7 @@ fn format_selection(sel: &ComponentSelection) -> String {
match sel {
ComponentSelection::All => "All".to_string(),
ComponentSelection::Distance(d) => format!("Last {d} blocks"),
ComponentSelection::Since(block) => format!("Since block {block}"),
ComponentSelection::None => "None".to_string(),
}
}

View File

@@ -78,9 +78,10 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> InitStateC
self.env.init::<N>(AccessRights::RW, runtime)?;
let static_file_provider = provider_factory.static_file_provider();
let provider_rw = provider_factory.database_provider_rw()?;
if self.without_evm {
let provider_rw = provider_factory.database_provider_rw()?;
// ensure header, total difficulty and header hash are provided
let header = self.header.ok_or_else(|| eyre::eyre!("Header file must be provided"))?;
let header = without_evm::read_header_from_file::<
@@ -106,23 +107,22 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> InitStateC
// SAFETY: it's safe to commit static files, since in the event of a crash, they
// will be unwound according to database checkpoints.
//
// Necessary to commit, so the header is accessible to provider_rw and
// init_state_dump
// Necessary to commit, so the header is accessible to init_from_state_dump
static_file_provider.commit()?;
} else if last_block_number > 0 && last_block_number < header.number() {
return Err(eyre::eyre!(
"Data directory should be empty when calling init-state with --without-evm."
));
}
provider_rw.commit()?;
}
info!(target: "reth::cli", "Initiating state dump");
let reader = BufReader::new(reth_fs_util::open(self.state)?);
let hash = init_from_state_dump(reader, &provider_rw, config.stages.etl)?;
provider_rw.commit()?;
let hash = init_from_state_dump(reader, &provider_factory, config.stages.etl)?;
info!(target: "reth::cli", hash = ?hash, "Genesis block written");
Ok(())

View File

@@ -19,6 +19,7 @@ use reth_trie::{
MultiProofTargets, StorageMultiProof, StorageProof, TrieInput,
};
use std::{
fmt,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
@@ -147,6 +148,29 @@ pub enum CachedStatus<T> {
Cached(T),
}
/// The source that is using the execution cache.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CachedStateMetricsSource {
/// Engine (validation).
Engine,
/// Payload builder.
Builder,
/// Tests.
#[cfg(any(test, feature = "test-utils"))]
Test,
}
impl fmt::Display for CachedStateMetricsSource {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Engine => f.write_str("engine"),
Self::Builder => f.write_str("builder"),
#[cfg(any(test, feature = "test-utils"))]
Self::Test => f.write_str("test"),
}
}
}
/// Metrics for the cached state provider, showing hits / misses for each cache
#[derive(Metrics, Clone)]
#[metrics(scope = "sync.caching")]
@@ -222,9 +246,10 @@ impl CachedStateMetrics {
self.account_cache_collisions.set(0);
}
/// Returns a new zeroed-out instance of [`CachedStateMetrics`].
pub fn zeroed() -> Self {
let zeroed = Self::default();
/// Returns a new zeroed-out instance of [`CachedStateMetrics`] with a `source` label
/// to distinguish between different callers (e.g., engine vs builder).
pub fn zeroed(source: CachedStateMetricsSource) -> Self {
let zeroed = Self::new_with_labels(&[("source", source.to_string())]);
zeroed.reset();
zeroed
}
@@ -919,27 +944,15 @@ pub struct SavedCache {
/// The caches used for the provider.
caches: ExecutionCache,
/// Metrics for the cached state provider (includes size/capacity/collisions from fixed-cache)
metrics: CachedStateMetrics,
/// A guard to track in-flight usage of this cache.
/// The cache is considered available if the strong count is 1.
usage_guard: Arc<()>,
/// Whether to skip cache metrics recording (can be expensive with large cached state).
disable_cache_metrics: bool,
}
impl SavedCache {
/// Creates a new instance with the internals
pub fn new(hash: B256, caches: ExecutionCache, metrics: CachedStateMetrics) -> Self {
Self { hash, caches, metrics, usage_guard: Arc::new(()), disable_cache_metrics: false }
}
/// Sets whether to disable cache metrics recording.
pub const fn with_disable_cache_metrics(mut self, disable: bool) -> Self {
self.disable_cache_metrics = disable;
self
pub fn new(hash: B256, caches: ExecutionCache) -> Self {
Self { hash, caches, usage_guard: Arc::new(()) }
}
/// Returns the hash for this cache
@@ -947,11 +960,6 @@ impl SavedCache {
self.hash
}
/// Splits the cache into its caches, metrics, and `disable_cache_metrics` flag, consuming it.
pub fn split(self) -> (ExecutionCache, CachedStateMetrics, bool) {
(self.caches, self.metrics, self.disable_cache_metrics)
}
/// Returns true if the cache is available for use (no other tasks are currently using it).
pub fn is_available(&self) -> bool {
Arc::strong_count(&self.usage_guard) == 1
@@ -967,20 +975,11 @@ impl SavedCache {
&self.caches
}
/// Returns the metrics associated with this cache.
pub const fn metrics(&self) -> &CachedStateMetrics {
&self.metrics
}
/// Updates the cache metrics (size/capacity/collisions) from the stats handlers.
///
/// Note: This can be expensive with large cached state. Use
/// `with_disable_cache_metrics(true)` to skip.
pub fn update_metrics(&self) {
if self.disable_cache_metrics {
return
pub fn update_metrics(&self, metrics: Option<&CachedStateMetrics>) {
if let Some(metrics) = metrics {
self.caches.update_metrics(metrics);
}
self.caches.update_metrics(&self.metrics);
}
/// Clears all caches, resetting them to empty state,
@@ -1017,8 +1016,11 @@ mod tests {
provider.extend_accounts(vec![(address, account)]);
let caches = ExecutionCache::new(1000);
let state_provider =
CachedStateProvider::new(provider, caches, CachedStateMetrics::zeroed());
let state_provider = CachedStateProvider::new(
provider,
caches,
CachedStateMetrics::zeroed(CachedStateMetricsSource::Test),
);
let res = state_provider.storage(address, storage_key);
assert!(res.is_ok());
@@ -1037,8 +1039,11 @@ mod tests {
provider.extend_accounts(vec![(address, account)]);
let caches = ExecutionCache::new(1000);
let state_provider =
CachedStateProvider::new(provider, caches, CachedStateMetrics::zeroed());
let state_provider = CachedStateProvider::new(
provider,
caches,
CachedStateMetrics::zeroed(CachedStateMetricsSource::Test),
);
let res = state_provider.storage(address, storage_key);
assert!(res.is_ok());
@@ -1075,7 +1080,7 @@ mod tests {
#[test]
fn test_saved_cache_is_available() {
let execution_cache = ExecutionCache::new(1000);
let cache = SavedCache::new(B256::ZERO, execution_cache, CachedStateMetrics::zeroed());
let cache = SavedCache::new(B256::ZERO, execution_cache);
assert!(cache.is_available(), "Cache should be available initially");
@@ -1087,8 +1092,7 @@ mod tests {
#[test]
fn test_saved_cache_multiple_references() {
let execution_cache = ExecutionCache::new(1000);
let cache =
SavedCache::new(B256::from([2u8; 32]), execution_cache, CachedStateMetrics::zeroed());
let cache = SavedCache::new(B256::from([2u8; 32]), execution_cache);
let guard1 = cache.clone_guard_for_test();
let guard2 = cache.clone_guard_for_test();

View File

@@ -165,11 +165,7 @@ mod tests {
let hash = B256::from([1u8; 32]);
cache.update_with_guard(|slot| {
*slot = Some(SavedCache::new(
hash,
ExecutionCache::new(1_000),
CachedStateMetrics::zeroed(),
))
*slot = Some(SavedCache::new(hash, ExecutionCache::new(1_000)))
});
let first = cache.get_cache_for(hash);
@@ -185,11 +181,7 @@ mod tests {
let hash = B256::from([2u8; 32]);
cache.update_with_guard(|slot| {
*slot = Some(SavedCache::new(
hash,
ExecutionCache::new(1_000),
CachedStateMetrics::zeroed(),
))
*slot = Some(SavedCache::new(hash, ExecutionCache::new(1_000)))
});
let checked_out = cache.get_cache_for(hash);
@@ -207,11 +199,7 @@ mod tests {
let hash_b = B256::from([0xBB; 32]);
cache.update_with_guard(|slot| {
*slot = Some(SavedCache::new(
hash_a,
ExecutionCache::new(1_000),
CachedStateMetrics::zeroed(),
))
*slot = Some(SavedCache::new(hash_a, ExecutionCache::new(1_000)))
});
let checked_out = cache.get_cache_for(hash_b);

View File

@@ -41,7 +41,7 @@ reth-trie-db.workspace = true
alloy-evm.workspace = true
alloy-consensus.workspace = true
alloy-eips.workspace = true
alloy-eip7928.workspace = true
alloy-eip7928 = { workspace = true, features = ["rlp"] }
alloy-primitives.workspace = true
alloy-rlp.workspace = true
alloy-rpc-types-engine.workspace = true
@@ -60,6 +60,7 @@ metrics.workspace = true
reth-metrics = { workspace = true, features = ["common"] }
# misc
indexmap.workspace = true
schnellru.workspace = true
rayon.workspace = true
tracing.workspace = true

View File

@@ -1,6 +1,7 @@
use crate::tree::metrics::BlockBufferMetrics;
use alloy_consensus::BlockHeader;
use alloy_primitives::{BlockHash, BlockNumber};
use indexmap::IndexSet;
use reth_primitives_traits::{Block, SealedBlock};
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
@@ -22,7 +23,7 @@ pub struct BlockBuffer<B: Block> {
/// Map of any parent block hash (even the ones not currently in the buffer)
/// to the buffered children.
/// Allows connecting buffered blocks by parent.
pub(crate) parent_to_child: HashMap<BlockHash, HashSet<BlockHash>>,
pub(crate) parent_to_child: HashMap<BlockHash, IndexSet<BlockHash>>,
/// `BTreeMap` tracking the earliest blocks by block number.
/// Used for removal of old blocks that precede finalization.
pub(crate) earliest_blocks: BTreeMap<BlockNumber, HashSet<BlockHash>>,
@@ -139,7 +140,7 @@ impl<B: Block> BlockBuffer<B> {
fn remove_from_parent(&mut self, parent_hash: BlockHash, hash: &BlockHash) {
// remove from parent to child connection, but only for this block parent.
if let Some(entry) = self.parent_to_child.get_mut(&parent_hash) {
entry.remove(hash);
entry.swap_remove(hash);
// if set is empty remove block entry.
if entry.is_empty() {
self.parent_to_child.remove(&parent_hash);

View File

@@ -71,7 +71,8 @@ pub use payload_validator::{BasicEngineValidator, EngineValidator};
pub use persistence_state::PersistenceState;
pub use reth_engine_primitives::TreeConfig;
pub use reth_execution_cache::{
CachedStateMetrics, CachedStateProvider, ExecutionCache, PayloadExecutionCache, SavedCache,
CachedStateMetrics, CachedStateMetricsSource, CachedStateProvider, ExecutionCache,
PayloadExecutionCache, SavedCache,
};
pub mod state;
@@ -1692,7 +1693,6 @@ where
let gas_used = payload.gas_used();
let num_hash = payload.num_hash();
let mut output = self.on_new_payload(payload);
let latency = start.elapsed();
self.metrics.engine.new_payload.update_response_metrics(
start,
&mut self.metrics.engine.forkchoice_updated.latest_finish_at,
@@ -1700,6 +1700,12 @@ where
gas_used,
);
// Latency measures time from enqueue to completion, excluding
// only the explicit persistence wait. This means backpressure
// (time spent queued due to the engine being busy) is included,
// reflecting real-world engine responsiveness.
let latency = enqueued_at.elapsed() - explicit_persistence_wait;
let maybe_event =
output.as_mut().ok().and_then(|out| out.event.take());

View File

@@ -4,8 +4,8 @@ use super::precompile_cache::PrecompileCacheMap;
use crate::tree::{
payload_processor::prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmMode, PrewarmTaskEvent},
sparse_trie::SparseTrieCacheTask,
CacheWaitDurations, CachedStateMetrics, ExecutionCache, PayloadExecutionCache, SavedCache,
StateProviderBuilder, TreeConfig, WaitForCaches,
CacheWaitDurations, CachedStateMetrics, CachedStateMetricsSource, ExecutionCache,
PayloadExecutionCache, SavedCache, StateProviderBuilder, TreeConfig, WaitForCaches,
};
use alloy_eip7928::BlockAccessList;
use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
@@ -96,6 +96,8 @@ where
executor: Runtime,
/// The most recent cache used for execution.
execution_cache: PayloadExecutionCache,
/// Metrics for the execution cache.
cache_metrics: Option<CachedStateMetrics>,
/// Metrics for trie operations
trie_metrics: MultiProofTaskMetrics,
/// Cross-block cache size in bytes.
@@ -120,8 +122,6 @@ where
sparse_trie_max_hot_accounts: usize,
/// Whether sparse trie cache pruning is fully disabled.
disable_sparse_trie_cache_pruning: bool,
/// Whether to disable cache metrics recording.
disable_cache_metrics: bool,
}
impl<N, Evm> PayloadProcessor<Evm>
@@ -155,7 +155,8 @@ where
sparse_trie_max_hot_slots: config.sparse_trie_max_hot_slots(),
sparse_trie_max_hot_accounts: config.sparse_trie_max_hot_accounts(),
disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
disable_cache_metrics: config.disable_cache_metrics(),
cache_metrics: (!config.disable_cache_metrics())
.then(|| CachedStateMetrics::zeroed(CachedStateMetricsSource::Engine)),
}
}
}
@@ -482,6 +483,7 @@ where
saved_cache: saved_cache.clone(),
provider: provider_builder,
metrics: PrewarmMetrics::default(),
cache_metrics: self.cache_metrics.clone(),
terminate_execution: Arc::new(AtomicBool::new(false)),
executed_tx_index: Arc::clone(&executed_tx_index),
precompile_cache_disabled: self.precompile_cache_disabled,
@@ -509,7 +511,12 @@ where
});
}
CacheTaskHandle { saved_cache, to_prewarm_task: Some(to_prewarm_task), executed_tx_index }
CacheTaskHandle {
saved_cache,
to_prewarm_task: Some(to_prewarm_task),
executed_tx_index,
cache_metrics: self.cache_metrics.clone(),
}
}
/// Returns the cache for the given parent hash.
@@ -525,10 +532,10 @@ where
debug!("creating new execution cache on cache miss");
let start = Instant::now();
let cache = ExecutionCache::new(self.cross_block_cache_size);
let metrics = CachedStateMetrics::zeroed();
metrics.record_cache_creation(start.elapsed());
SavedCache::new(parent_hash, cache, metrics)
.with_disable_cache_metrics(self.disable_cache_metrics)
if let Some(metrics) = &self.cache_metrics {
metrics.record_cache_creation(start.elapsed());
}
SavedCache::new(parent_hash, cache)
}
}
@@ -675,7 +682,7 @@ where
block_with_parent: BlockWithParent,
bundle_state: &BundleState,
) {
let disable_cache_metrics = self.disable_cache_metrics;
let cache_metrics = self.cache_metrics.clone();
self.execution_cache.update_with_guard(|cached| {
if cached.as_ref().is_some_and(|c| c.executed_block_hash() != block_with_parent.parent) {
debug!(
@@ -687,25 +694,19 @@ where
}
// Take existing cache (if any) or create fresh caches
let (caches, cache_metrics, _) = match cached.take() {
Some(existing) => existing.split(),
None => (
ExecutionCache::new(self.cross_block_cache_size),
CachedStateMetrics::zeroed(),
false,
),
let caches = match cached.take() {
Some(existing) => existing.cache().clone(),
None => ExecutionCache::new(self.cross_block_cache_size),
};
// Insert the block's bundle state into cache
let new_cache =
SavedCache::new(block_with_parent.block.hash, caches, cache_metrics)
.with_disable_cache_metrics(disable_cache_metrics);
let new_cache = SavedCache::new(block_with_parent.block.hash, caches);
if new_cache.cache().insert_state(bundle_state).is_err() {
*cached = None;
debug!(target: "engine::caching", "cleared execution cache on update error");
return
}
new_cache.update_metrics();
new_cache.update_metrics(cache_metrics.as_ref());
// Replace with the updated cache
*cached = Some(new_cache);
@@ -799,9 +800,9 @@ impl<Tx, Err, R: Send + Sync + 'static> PayloadHandle<Tx, Err, R> {
self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.cache().clone())
}
/// Returns a clone of the cache metrics used by prewarming
/// Returns engine cache metrics if a cache exists for prewarming.
pub fn cache_metrics(&self) -> Option<CachedStateMetrics> {
self.prewarm_handle.saved_cache.as_ref().map(|cache| cache.metrics().clone())
self.prewarm_handle.cache_metrics.clone()
}
/// Returns a reference to the shared executed transaction index counter.
@@ -852,6 +853,8 @@ pub struct CacheTaskHandle<R> {
/// Shared counter tracking the next transaction index to be executed by the main execution
/// loop. Prewarm workers skip transactions below this index.
executed_tx_index: Arc<AtomicUsize>,
/// Metrics for the execution cache.
cache_metrics: Option<CachedStateMetrics>,
}
impl<R: Send + Sync + 'static> CacheTaskHandle<R> {
@@ -946,8 +949,7 @@ mod tests {
use crate::tree::{
payload_processor::{evm_state_to_hashed_post_state, ExecutionEnv, PayloadProcessor},
precompile_cache::PrecompileCacheMap,
CachedStateMetrics, ExecutionCache, PayloadExecutionCache, SavedCache,
StateProviderBuilder, TreeConfig,
ExecutionCache, PayloadExecutionCache, SavedCache, StateProviderBuilder, TreeConfig,
};
use alloy_eips::eip1898::{BlockNumHash, BlockWithParent};
use alloy_evm::block::StateChangeSource;
@@ -973,7 +975,7 @@ mod tests {
fn make_saved_cache(hash: B256) -> SavedCache {
let execution_cache = ExecutionCache::new(1_000);
SavedCache::new(hash, execution_cache, CachedStateMetrics::zeroed())
SavedCache::new(hash, execution_cache)
}
#[test]

View File

@@ -14,7 +14,8 @@
use crate::tree::{
payload_processor::multiproof::StateRootMessage,
precompile_cache::{CachedPrecompile, PrecompileCacheMap},
CachedStateProvider, ExecutionEnv, PayloadExecutionCache, SavedCache, StateProviderBuilder,
CachedStateMetrics, CachedStateProvider, ExecutionEnv, PayloadExecutionCache, SavedCache,
StateProviderBuilder,
};
use alloy_consensus::transaction::TxHashRef;
use alloy_eip7928::BlockAccessList;
@@ -276,19 +277,20 @@ where
) {
let start = Instant::now();
let Self { execution_cache, ctx: PrewarmContext { env, metrics, saved_cache, .. }, .. } =
self;
let Self {
execution_cache,
ctx: PrewarmContext { env, metrics, cache_metrics, saved_cache, .. },
..
} = self;
let hash = env.hash;
if let Some(saved_cache) = saved_cache {
debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
// Perform all cache operations atomically under the lock
execution_cache.update_with_guard(|cached| {
// consumes the `SavedCache` held by the prewarming task, which releases its usage
// guard
let (caches, cache_metrics, disable_cache_metrics) = saved_cache.split();
let new_cache = SavedCache::new(hash, caches, cache_metrics)
.with_disable_cache_metrics(disable_cache_metrics);
let caches = saved_cache.cache().clone();
let new_cache = SavedCache::new(hash, caches);
// Insert state into cache while holding the lock
// Access the BundleState through the shared ExecutionOutcome
@@ -299,7 +301,7 @@ where
return;
}
new_cache.update_metrics();
new_cache.update_metrics(cache_metrics.as_ref());
if valid_block_rx.recv().is_ok() {
// Replace the shared cache with the new one; the previous cache (if any) is
@@ -521,6 +523,9 @@ where
pub provider: StateProviderBuilder<N, P>,
/// The metrics for the prewarm task.
pub metrics: PrewarmMetrics,
/// Metrics for the execution cache.
/// Metrics for the execution cache. `None` disables metrics recording.
pub cache_metrics: Option<CachedStateMetrics>,
/// An atomic bool that tells prewarm tasks to not start any more execution.
pub terminate_execution: Arc<AtomicBool>,
/// Shared counter tracking the next transaction index to be executed by the main execution
@@ -562,9 +567,11 @@ where
// Use the caches to create a new provider with caching
if let Some(saved_cache) = &self.saved_cache {
let caches = saved_cache.cache().clone();
let cache_metrics = saved_cache.metrics().clone();
state_provider =
Box::new(CachedStateProvider::new_prewarm(state_provider, caches, cache_metrics));
state_provider = Box::new(CachedStateProvider::new_prewarm(
state_provider,
caches,
self.cache_metrics.clone().unwrap_or_default(),
));
}
let state_provider = StateProviderDatabase::new(state_provider);
@@ -665,8 +672,11 @@ where
};
let boxed: Box<dyn AccountReader> = if let Some(saved) = &self.saved_cache {
let caches = saved.cache().clone();
let cache_metrics = saved.metrics().clone();
Box::new(CachedStateProvider::new_prewarm(inner, caches, cache_metrics))
Box::new(CachedStateProvider::new_prewarm(
inner,
caches,
self.cache_metrics.clone().unwrap_or_default(),
))
} else {
Box::new(inner)
};
@@ -761,8 +771,11 @@ where
let saved_cache =
self.saved_cache.as_ref().expect("BAL prewarm should only run with cache");
let caches = saved_cache.cache().clone();
let cache_metrics = saved_cache.metrics().clone();
slot.insert(CachedStateProvider::new_prewarm(built, caches, cache_metrics))
slot.insert(CachedStateProvider::new_prewarm(
built,
caches,
self.cache_metrics.clone().unwrap_or_default(),
))
}
};

View File

@@ -48,7 +48,7 @@ use crate::tree::{
PayloadHandle, StateProviderBuilder, StateProviderDatabase, TreeConfig, WaitForCaches,
};
use alloy_consensus::transaction::{Either, TxHashRef};
use alloy_eip7928::BlockAccessList;
use alloy_eip7928::{bal::Bal, BlockAccessList};
use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal, NumHash};
use alloy_evm::Evm;
use alloy_primitives::{map::B256Set, B256};
@@ -2100,10 +2100,14 @@ impl<T: PayloadTypes> BlockOrPayload<T> {
}
}
/// Returns the block access list if available.
pub const fn block_access_list(&self) -> Option<Result<BlockAccessList, alloy_rlp::Error>> {
// TODO decode and return `BlockAccessList`
None
/// Returns the block access list embedded in a payload, if present.
pub fn block_access_list(&self) -> Option<Result<BlockAccessList, alloy_rlp::Error>> {
match self {
Self::Payload(payload) => payload.block_access_list().map(|block_access_list| {
alloy_rlp::decode_exact::<Bal>(block_access_list.as_ref()).map(Bal::into_inner)
}),
Self::Block(_) => None,
}
}
/// Returns the number of transactions in the payload or block.

View File

@@ -73,7 +73,9 @@ where
}
}
/// Cache entry, precompile successful output.
/// Cache entry for a successful precompile output.
///
/// We intentionally do not cache non-successful statuses or errors.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CacheEntry<S> {
output: PrecompileOutput,
@@ -180,7 +182,9 @@ where
let result = self.precompile.call(input);
match &result {
Ok(output) => {
// Only successful outputs are cacheable. Non-success statuses and errors must execute
// again instead of poisoning the cache for subsequent calls.
Ok(output) if output.is_success() => {
let size = self.cache.insert(
Bytes::copy_from_slice(calldata),
CacheEntry { output: output.clone(), spec: self.spec_id.clone() },
@@ -228,17 +232,21 @@ mod tests {
use super::*;
use reth_evm::{EthEvmFactory, Evm, EvmEnv, EvmFactory};
use reth_revm::db::EmptyDB;
use revm::{context::TxEnv, precompile::PrecompileOutput};
use revm::{
context::TxEnv,
precompile::{PrecompileOutput, PrecompileStatus},
};
use revm_primitives::hardfork::SpecId;
#[test]
fn test_precompile_cache_basic() {
let dyn_precompile: DynPrecompile = (|_input: PrecompileInput<'_>| -> PrecompileResult {
Ok(PrecompileOutput {
status: PrecompileStatus::Success,
gas_used: 0,
gas_refunded: 0,
state_gas_used: 0,
reservoir: 0,
bytes: Bytes::default(),
reverted: false,
})
})
.into();
@@ -247,10 +255,11 @@ mod tests {
CachedPrecompile::new(dyn_precompile, PrecompileCache::default(), SpecId::PRAGUE, None);
let output = PrecompileOutput {
status: PrecompileStatus::Success,
gas_used: 50,
gas_refunded: 0,
state_gas_used: 0,
reservoir: 0,
bytes: alloy_primitives::Bytes::copy_from_slice(b"cached_result"),
reverted: false,
};
let input = b"test_input";
@@ -279,10 +288,11 @@ mod tests {
assert_eq!(input.data, input_data);
Ok(PrecompileOutput {
status: PrecompileStatus::Success,
gas_used: 5000,
gas_refunded: 0,
state_gas_used: 0,
reservoir: 0,
bytes: alloy_primitives::Bytes::copy_from_slice(b"output_from_precompile_1"),
reverted: false,
})
}
})
@@ -294,10 +304,11 @@ mod tests {
assert_eq!(input.data, input_data);
Ok(PrecompileOutput {
status: PrecompileStatus::Success,
gas_used: 7000,
gas_refunded: 0,
state_gas_used: 0,
reservoir: 0,
bytes: alloy_primitives::Bytes::copy_from_slice(b"output_from_precompile_2"),
reverted: false,
})
}
})

View File

@@ -109,13 +109,8 @@ where
result: &BlockExecutionResult<N::Receipt>,
receipt_root_bloom: Option<ReceiptRootBloom>,
) -> Result<(), ConsensusError> {
let res = validate_block_post_execution(
block,
&self.chain_spec,
&result.receipts,
&result.requests,
receipt_root_bloom,
);
let res =
validate_block_post_execution(block, &self.chain_spec, result, receipt_root_bloom);
if self.skip_requests_hash_check &&
let Err(ConsensusError::BodyRequestsHashDiff(_)) = &res

View File

@@ -1,9 +1,10 @@
use alloc::vec::Vec;
use alloy_consensus::{proofs::calculate_receipt_root, BlockHeader, TxReceipt};
use alloy_eips::{eip7685::Requests, Encodable2718};
use alloy_eips::Encodable2718;
use alloy_primitives::{Bloom, Bytes, B256};
use reth_chainspec::EthereumHardforks;
use reth_consensus::ConsensusError;
use reth_execution_types::BlockExecutionResult;
use reth_primitives_traits::{
receipt::gas_spent_by_transactions, Block, GotExpected, Receipt, RecoveredBlock,
};
@@ -18,8 +19,7 @@ use reth_primitives_traits::{
pub fn validate_block_post_execution<B, R, ChainSpec>(
block: &RecoveredBlock<B>,
chain_spec: &ChainSpec,
receipts: &[R],
requests: &Requests,
result: &BlockExecutionResult<R>,
receipt_root_bloom: Option<(B256, Bloom)>,
) -> Result<(), ConsensusError>
where
@@ -28,12 +28,10 @@ where
ChainSpec: EthereumHardforks,
{
// Check if gas used matches the value set in header.
let cumulative_gas_used =
receipts.last().map(|receipt| receipt.cumulative_gas_used()).unwrap_or(0);
if block.header().gas_used() != cumulative_gas_used {
if block.header().gas_used() != result.gas_used {
return Err(ConsensusError::BlockGasUsed {
gas: GotExpected { got: cumulative_gas_used, expected: block.header().gas_used() },
gas_spent_by_tx: gas_spent_by_transactions(receipts),
gas: GotExpected { got: result.gas_used, expected: block.header().gas_used() },
gas_spent_by_tx: gas_spent_by_transactions(&result.receipts),
})
}
@@ -42,7 +40,7 @@ where
// transaction This was replaced with is_success flag.
// See more about EIP here: https://eips.ethereum.org/EIPS/eip-658
if chain_spec.is_byzantium_active_at_block(block.header().number()) {
let result = if let Some((receipts_root, logs_bloom)) = receipt_root_bloom {
let res = if let Some((receipts_root, logs_bloom)) = receipt_root_bloom {
compare_receipts_root_and_logs_bloom(
receipts_root,
logs_bloom,
@@ -50,11 +48,16 @@ where
block.header().logs_bloom(),
)
} else {
verify_receipts(block.header().receipts_root(), block.header().logs_bloom(), receipts)
verify_receipts(
block.header().receipts_root(),
block.header().logs_bloom(),
&result.receipts,
)
};
if let Err(error) = result {
let receipts = receipts
if let Err(error) = res {
let receipts = result
.receipts
.iter()
.map(|r| Bytes::from(r.with_bloom_ref().encoded_2718()))
.collect::<Vec<_>>();
@@ -68,7 +71,7 @@ where
let Some(header_requests_hash) = block.header().requests_hash() else {
return Err(ConsensusError::RequestsHashMissing)
};
let requests_hash = requests.requests_hash();
let requests_hash = result.requests.requests_hash();
if requests_hash != header_requests_hash {
return Err(ConsensusError::BodyRequestsHashDiff(
GotExpected::new(requests_hash, header_requests_hash).into(),

View File

@@ -10,4 +10,7 @@ pub enum BuiltPayloadConversionError {
/// Unexpected EIP-7594 sidecars in the built payload.
#[error("unexpected EIP-7594 sidecars")]
UnexpectedEip7594Sidecars,
/// Missing block access list (required for V6 envelope).
#[error("missing block access list")]
MissingBlockAccessList,
}

View File

@@ -6,11 +6,11 @@ use alloy_eips::{
eip7594::{BlobTransactionSidecarEip7594, BlobTransactionSidecarVariant},
eip7685::Requests,
};
use alloy_primitives::U256;
use alloy_primitives::{Bytes, U256};
use alloy_rpc_types_engine::{
BlobsBundleV1, BlobsBundleV2, ExecutionPayloadEnvelopeV2, ExecutionPayloadEnvelopeV3,
ExecutionPayloadEnvelopeV4, ExecutionPayloadEnvelopeV5, ExecutionPayloadEnvelopeV6,
ExecutionPayloadFieldV2, ExecutionPayloadV1, ExecutionPayloadV3,
ExecutionPayloadFieldV2, ExecutionPayloadV1, ExecutionPayloadV3, ExecutionPayloadV4,
};
use reth_ethereum_primitives::EthPrimitives;
use reth_payload_primitives::BuiltPayload;
@@ -36,6 +36,8 @@ pub struct EthBuiltPayload<N: NodePrimitives = EthPrimitives> {
pub(crate) sidecars: BlobSidecars,
/// The requests of the payload
pub(crate) requests: Option<Requests>,
/// The block access list of the payload
pub(crate) block_access_list: Option<Bytes>,
}
// === impl BuiltPayload ===
@@ -48,8 +50,9 @@ impl<N: NodePrimitives> EthBuiltPayload<N> {
block: Arc<SealedBlock<N::Block>>,
fees: U256,
requests: Option<Requests>,
block_access_list: Option<Bytes>,
) -> Self {
Self { block, fees, requests, sidecars: BlobSidecars::Empty }
Self { block, fees, requests, sidecars: BlobSidecars::Empty, block_access_list }
}
/// Returns the built block(sealed)
@@ -152,9 +155,39 @@ impl EthBuiltPayload {
/// Try converting built payload into [`ExecutionPayloadEnvelopeV6`].
///
/// Note: Amsterdam fork is not yet implemented, so this conversion is not yet supported.
/// Returns an error if the block access list is missing, as it's required for V6 envelopes.
pub fn try_into_v6(self) -> Result<ExecutionPayloadEnvelopeV6, BuiltPayloadConversionError> {
unimplemented!("ExecutionPayloadEnvelopeV6 not yet supported")
let Self { block, fees, sidecars, requests, block_access_list, .. } = self;
let block_access_list =
block_access_list.ok_or(BuiltPayloadConversionError::MissingBlockAccessList)?;
let blobs_bundle = match sidecars {
BlobSidecars::Empty => BlobsBundleV2::empty(),
BlobSidecars::Eip7594(sidecars) => BlobsBundleV2::from(sidecars),
BlobSidecars::Eip4844(_) => {
return Err(BuiltPayloadConversionError::UnexpectedEip4844Sidecars)
}
};
Ok(ExecutionPayloadEnvelopeV6 {
execution_payload: ExecutionPayloadV4::from_block_unchecked_with_bal(
block.hash(),
&Arc::unwrap_or_clone(block).into_block(),
block_access_list,
),
block_value: fees,
// From the engine API spec:
//
// > Client software **MAY** use any heuristics to decide whether to set
// `shouldOverrideBuilder` flag or not. If client software does not implement any
// heuristic this flag **SHOULD** be set to `false`.
//
// Spec:
// <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#specification-2>
should_override_builder: false,
blobs_bundle,
execution_requests: requests.unwrap_or_default(),
})
}
}

View File

@@ -175,6 +175,7 @@ where
suggested_fee_recipient: attributes.suggested_fee_recipient,
prev_randao: attributes.prev_randao,
gas_limit: attributes.gas_limit,
slot_number: attributes.slot_number,
},
self.chain_spec().next_block_base_fee(parent, attributes.timestamp).unwrap_or_default(),
self.chain_spec(),

View File

@@ -25,7 +25,7 @@ use reth_evm::{
ConfigureEvm, Evm, NextBlockEnvAttributes,
};
use reth_evm_ethereum::EthEvmConfig;
use reth_execution_cache::CachedStateProvider;
use reth_execution_cache::{CachedStateMetrics, CachedStateMetricsSource, CachedStateProvider};
use reth_payload_builder::{BlobSidecars, EthBuiltPayload};
use reth_payload_builder_primitives::PayloadBuilderError;
use reth_payload_primitives::PayloadAttributes;
@@ -172,7 +172,9 @@ where
state_provider = Box::new(CachedStateProvider::new(
state_provider,
execution_cache.cache().clone(),
execution_cache.metrics().clone(),
// It's ok to recreate the cache every time, because it's cheap to do so for a vanilla
// Ethereum builder every 12s.
CachedStateMetrics::zeroed(CachedStateMetricsSource::Builder),
));
}
let state = StateProviderDatabase::new(state_provider.as_ref());
@@ -191,6 +193,7 @@ where
parent_beacon_block_root: attributes.parent_beacon_block_root(),
withdrawals: attributes.withdrawals.clone().map(Into::into),
extra_data: builder_config.extra_data,
slot_number: attributes.slot_number(),
},
)
.map_err(PayloadBuilderError::other)?;
@@ -431,7 +434,7 @@ where
}));
}
let payload = EthBuiltPayload::new(sealed_block, total_fees, requests)
let payload = EthBuiltPayload::new(sealed_block, total_fees, requests, None)
// add blob sidecars from the executed txs
.with_sidecars(blob_sidecars);

View File

@@ -466,7 +466,7 @@ where
self.executor.execute_transaction_with_commit_condition((tx_env, &tx), f)?
{
self.transactions.push(tx);
Ok(Some(gas_used))
Ok(Some(gas_used.tx_gas_used()))
} else {
Ok(None)
}

View File

@@ -117,6 +117,7 @@ pub use alloy_evm::{
/// gas_limit: 30_000_000,
/// withdrawals: Some(withdrawals),
/// parent_beacon_block_root: Some(beacon_root),
/// slot_number: None,
/// };
///
/// // Build a new block on top of parent
@@ -501,4 +502,6 @@ pub struct NextBlockEnvAttributes {
pub withdrawals: Option<Withdrawals>,
/// Optional extra data.
pub extra_data: Bytes,
/// Optional slot number for post-Amsterdam payloads.
pub slot_number: Option<u64>,
}

View File

@@ -346,6 +346,15 @@ impl Discv4 {
self.send_to_service(cmd);
}
/// Adds the node as a bootnode.
///
/// This registers the node in the configured bootstrap set and inserts it into the routing
/// table, pinging it to establish the endpoint proof, same as the nodes provided at startup.
pub fn add_boot_node(&self, node_record: NodeRecord) {
let cmd = Discv4Command::AddBootNode(node_record);
self.send_to_service(cmd);
}
/// Adds the peer and id to the ban list.
///
/// This will prevent any future inclusion in the table
@@ -719,6 +728,15 @@ impl Discv4Service {
}
}
/// Adds the node to the bootstrap set and to the routing table.
///
/// Behaves like [`Self::add_node`] but also registers the node in the configured bootstrap
/// set so it is used for subsequent bootstrap attempts.
pub fn add_boot_node(&mut self, record: NodeRecord) -> bool {
self.config.bootstrap_nodes.insert(record);
self.add_node(record)
}
/// Spawns this services onto a new task
///
/// Note: requires a running tokio runtime
@@ -1750,6 +1768,9 @@ impl Discv4Service {
Discv4Command::Add(enr) => {
self.add_node(enr);
}
Discv4Command::AddBootNode(record) => {
self.add_boot_node(record);
}
Discv4Command::Lookup { node_id, tx } => {
let node_id = node_id.unwrap_or(self.local_node_record.id);
self.lookup_with(node_id, tx);
@@ -2066,6 +2087,7 @@ impl Default for ReceiveCache {
/// The commands sent from the frontend [Discv4] to the service [`Discv4Service`].
enum Discv4Command {
Add(NodeRecord),
AddBootNode(NodeRecord),
SetTcpPort(u16),
SetEIP868RLPPair { key: Vec<u8>, rlp: Bytes },
Ban(PeerId, IpAddr),

View File

@@ -20,6 +20,7 @@ reth-primitives-traits.workspace = true
# ethereum
alloy-chains = { workspace = true, features = ["rlp"] }
alloy-eip7928 = { workspace = true, features = ["rlp"], optional = true }
alloy-eips.workspace = true
alloy-primitives = { workspace = true, features = ["map"] }
alloy-rlp = { workspace = true, features = ["derive"] }
@@ -40,6 +41,7 @@ alloy-hardforks.workspace = true
reth-ethereum-primitives = { workspace = true, features = ["arbitrary"] }
alloy-primitives = { workspace = true, features = ["arbitrary", "rand"] }
alloy-consensus = { workspace = true, features = ["arbitrary"] }
alloy-eip7928 = { workspace = true, features = ["arbitrary", "rlp"] }
alloy-eips = { workspace = true, features = ["arbitrary"] }
alloy-genesis.workspace = true
alloy-chains = { workspace = true, features = ["arbitrary"] }
@@ -53,6 +55,7 @@ default = ["std"]
std = [
"alloy-chains/std",
"alloy-consensus/std",
"alloy-eip7928?/std",
"alloy-eips/std",
"alloy-genesis/std",
"alloy-primitives/std",
@@ -68,6 +71,8 @@ std = [
arbitrary = [
"reth-ethereum-primitives/arbitrary",
"alloy-chains/arbitrary",
"dep:alloy-eip7928",
"alloy-eip7928/arbitrary",
"dep:arbitrary",
"dep:proptest",
"dep:proptest-arbitrary-interop",
@@ -88,4 +93,5 @@ serde = [
"reth-primitives-traits/serde",
"reth-ethereum-primitives/serde",
"alloy-hardforks/serde",
"alloy-eip7928?/serde",
]

View File

@@ -2,7 +2,7 @@
use alloc::vec::Vec;
use alloy_primitives::{Bytes, B256};
use alloy_rlp::{RlpDecodableWrapper, RlpEncodableWrapper};
use alloy_rlp::{BufMut, Decodable, Encodable, Header, RlpDecodableWrapper, RlpEncodableWrapper};
use reth_codecs_derive::add_arbitrary_tests;
/// A request for block access lists from the given block hashes.
@@ -16,12 +16,209 @@ pub struct GetBlockAccessLists(
);
/// Response for [`GetBlockAccessLists`] containing one BAL per requested block hash.
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
///
/// The inner `Bytes` values store raw BAL RLP payloads and are encoded as nested RLP items, not
/// as RLP byte strings.
#[derive(Clone, Debug, PartialEq, Eq, Default)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
#[add_arbitrary_tests(rlp)]
pub struct BlockAccessLists(
/// The requested block access lists as opaque bytes. Unavailable entries are represented by
/// empty byte slices.
/// The requested block access lists as raw RLP blobs. Per EIP-8159, unavailable entries are
/// represented by an RLP-encoded empty list (`0xc0`).
pub Vec<Bytes>,
);
impl Encodable for BlockAccessLists {
fn encode(&self, out: &mut dyn BufMut) {
let payload_length = self.0.iter().map(|bytes| bytes.len()).sum();
Header { list: true, payload_length }.encode(out);
for bal in &self.0 {
out.put_slice(bal);
}
}
fn length(&self) -> usize {
let payload_length = self.0.iter().map(|bytes| bytes.len()).sum();
Header { list: true, payload_length }.length_with_payload()
}
}
impl Decodable for BlockAccessLists {
fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
let header = Header::decode(buf)?;
if !header.list {
return Err(alloy_rlp::Error::UnexpectedString)
}
let (mut payload, rest) = buf.split_at(header.payload_length);
*buf = rest;
let mut bals = Vec::new();
while !payload.is_empty() {
let item_start = payload;
let item_header = Header::decode(&mut payload)?;
if !item_header.list {
return Err(alloy_rlp::Error::UnexpectedString)
}
let item_length = item_header.length_with_payload();
bals.push(Bytes::copy_from_slice(&item_start[..item_length]));
payload = &payload[item_header.payload_length..];
}
Ok(Self(bals))
}
}
#[cfg(any(test, feature = "arbitrary"))]
impl<'a> arbitrary::Arbitrary<'a> for BlockAccessLists {
fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
let entries = Vec::<Vec<alloy_eip7928::AccountChanges>>::arbitrary(u)?
.into_iter()
.map(|entry| {
let mut out = Vec::new();
alloy_rlp::encode_list(&entry, &mut out);
Bytes::from(out)
})
.collect();
Ok(Self(entries))
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_eip7928::{
AccountChanges, BalanceChange, CodeChange, NonceChange, SlotChanges, StorageChange,
};
use alloy_primitives::{Address, U256};
use alloy_rlp::EMPTY_LIST_CODE;
fn elaborate_account_changes(seed: u8) -> Vec<AccountChanges> {
vec![
AccountChanges {
address: Address::from([seed; 20]),
storage_changes: vec![SlotChanges::new(
U256::from_be_bytes([seed.wrapping_add(1); 32]),
vec![
StorageChange::new(1, U256::from_be_bytes([seed.wrapping_add(2); 32])),
StorageChange::new(2, U256::from_be_bytes([seed.wrapping_add(3); 32])),
],
)],
storage_reads: vec![
U256::from_be_bytes([seed.wrapping_add(4); 32]),
U256::from_be_bytes([seed.wrapping_add(5); 32]),
],
balance_changes: vec![
BalanceChange::new(1, U256::from(1_000 + seed as u64)),
BalanceChange::new(2, U256::from(2_000 + seed as u64)),
],
nonce_changes: vec![
NonceChange::new(1, seed as u64),
NonceChange::new(2, seed as u64 + 1),
],
code_changes: vec![CodeChange::new(
1,
Bytes::from(vec![0x60, seed, 0x61, seed.wrapping_add(1), 0x56]),
)],
},
AccountChanges {
address: Address::from([seed.wrapping_add(9); 20]),
storage_changes: Vec::new(),
storage_reads: vec![U256::from_be_bytes([seed.wrapping_add(10); 32])],
balance_changes: vec![BalanceChange::new(3, U256::from(3_000 + seed as u64))],
nonce_changes: vec![NonceChange::new(3, seed as u64 + 2)],
code_changes: vec![CodeChange::new(2, Bytes::from(vec![0x5f, 0x5f, 0xf3]))],
},
]
}
fn elaborate_bal_entry(seed: u8) -> Bytes {
let account_changes = elaborate_account_changes(seed);
let mut out = Vec::new();
alloy_rlp::encode_list(&account_changes, &mut out);
Bytes::from(out)
}
#[test]
fn empty_bal_entry_encodes_as_empty_list() {
let encoded =
alloy_rlp::encode(BlockAccessLists(vec![Bytes::from_static(&[EMPTY_LIST_CODE])]));
assert_eq!(encoded, vec![0xc1, EMPTY_LIST_CODE]);
}
#[test]
fn block_access_lists_roundtrip_preserves_raw_bal_items() {
let original = BlockAccessLists(vec![
Bytes::from_static(&[EMPTY_LIST_CODE]),
Bytes::from_static(&[0xc1, EMPTY_LIST_CODE]),
Bytes::from_static(&[0xc2, EMPTY_LIST_CODE, EMPTY_LIST_CODE]),
]);
let encoded = alloy_rlp::encode(&original);
let decoded = alloy_rlp::decode_exact::<BlockAccessLists>(&encoded).unwrap();
assert_eq!(decoded, original);
}
#[test]
fn empty_response_roundtrips() {
let original = BlockAccessLists(Vec::new());
let encoded = alloy_rlp::encode(&original);
let decoded = alloy_rlp::decode_exact::<BlockAccessLists>(&encoded).unwrap();
assert_eq!(decoded, original);
}
#[test]
fn rejects_non_list_bal_entries() {
let err = alloy_rlp::decode_exact::<BlockAccessLists>(&[0xc1, 0x01]).unwrap_err();
assert!(matches!(err, alloy_rlp::Error::UnexpectedString));
}
#[test]
fn elaborate_bal_entry_roundtrips_into_account_changes() {
let expected = elaborate_account_changes(0x11);
let decoded =
alloy_rlp::decode_exact::<Vec<AccountChanges>>(&elaborate_bal_entry(0x11)).unwrap();
assert_eq!(decoded, expected);
}
#[test]
fn elaborate_block_access_lists_roundtrip_preserves_complex_bal_contents() {
let original = BlockAccessLists(vec![
elaborate_bal_entry(0x11),
Bytes::from_static(&[EMPTY_LIST_CODE]),
elaborate_bal_entry(0x77),
]);
let encoded = alloy_rlp::encode(&original);
let decoded = alloy_rlp::decode_exact::<BlockAccessLists>(&encoded).unwrap();
assert_eq!(decoded, original);
assert_eq!(
alloy_rlp::decode_exact::<Vec<AccountChanges>>(&decoded.0[0]).unwrap(),
elaborate_account_changes(0x11)
);
assert_eq!(alloy_rlp::decode_exact::<Vec<AccountChanges>>(&decoded.0[1]).unwrap(), vec![]);
assert_eq!(
alloy_rlp::decode_exact::<Vec<AccountChanges>>(&decoded.0[2]).unwrap(),
elaborate_account_changes(0x77)
);
}
#[test]
fn elaborate_block_access_lists_embed_raw_bal_payloads_without_reencoding() {
let first = elaborate_bal_entry(0x21);
let second = elaborate_bal_entry(0x42);
let encoded = alloy_rlp::encode(BlockAccessLists(vec![first.clone(), second.clone()]));
let header = alloy_rlp::Header::decode(&mut &encoded[..]).unwrap();
let payload = &encoded[header.length()..];
let expected_payload = [first.as_ref(), second.as_ref()].concat();
assert!(header.list);
assert_eq!(payload, expected_payload.as_slice());
}
}

View File

@@ -115,14 +115,14 @@ pub trait Peers: PeersInfo {
///
/// If the peer already exists, then this will update its tracked info.
fn add_peer(&self, peer: PeerId, tcp_addr: SocketAddr) {
self.add_peer_kind(peer, PeerKind::Static, tcp_addr, None);
self.add_peer_kind(peer, Some(PeerKind::Static), tcp_addr, None);
}
/// Adds a peer to the peer set with TCP and UDP `SocketAddr`.
///
/// If the peer already exists, then this will update its tracked info.
fn add_peer_with_udp(&self, peer: PeerId, tcp_addr: SocketAddr, udp_addr: SocketAddr) {
self.add_peer_kind(peer, PeerKind::Static, tcp_addr, Some(udp_addr));
self.add_peer_kind(peer, Some(PeerKind::Static), tcp_addr, Some(udp_addr));
}
/// Adds a trusted [`PeerId`] to the peer set.
@@ -132,12 +132,12 @@ pub trait Peers: PeersInfo {
/// Adds a trusted peer to the peer set with TCP `SocketAddr`.
fn add_trusted_peer(&self, peer: PeerId, tcp_addr: SocketAddr) {
self.add_peer_kind(peer, PeerKind::Trusted, tcp_addr, None);
self.add_peer_kind(peer, Some(PeerKind::Trusted), tcp_addr, None);
}
/// Adds a trusted peer with TCP and UDP `SocketAddr` to the peer set.
fn add_trusted_peer_with_udp(&self, peer: PeerId, tcp_addr: SocketAddr, udp_addr: SocketAddr) {
self.add_peer_kind(peer, PeerKind::Trusted, tcp_addr, Some(udp_addr));
self.add_peer_kind(peer, Some(PeerKind::Trusted), tcp_addr, Some(udp_addr));
}
/// Adds a peer to the known peer set, with the given kind.
@@ -146,7 +146,7 @@ pub trait Peers: PeersInfo {
fn add_peer_kind(
&self,
peer: PeerId,
kind: PeerKind,
kind: Option<PeerKind>,
tcp_addr: SocketAddr,
udp_addr: Option<SocketAddr>,
);

View File

@@ -125,7 +125,7 @@ where
fn add_peer_kind(
&self,
_peer: PeerId,
_kind: PeerKind,
_kind: Option<PeerKind>,
_tcp_addr: SocketAddr,
_udp_addr: Option<SocketAddr>,
) {

View File

@@ -284,14 +284,22 @@ where
/// Handles [`GetBlockAccessLists`] queries.
///
/// For now this returns one empty BAL per requested hash.
/// EIP-8159 defines the final `BlockAccessLists` response semantics:
/// <https://eips.ethereum.org/EIPS/eip-8159>
fn on_block_access_lists_request(
&self,
_peer_id: PeerId,
request: GetBlockAccessLists,
response: oneshot::Sender<RequestResult<BlockAccessLists>>,
) {
let access_lists = request.0.into_iter().map(|_| Bytes::new()).collect();
// TODO: BAL serving is not fully implemented yet. Per EIP-8159, unavailable BALs are
// returned as empty BAL entries while preserving request order, so we currently return
// one RLP-encoded empty BAL (`0xc0`) per requested hash.
let access_lists = request
.0
.into_iter()
.map(|_| Bytes::from_static(&[alloy_rlp::EMPTY_LIST_CODE]))
.collect();
let _ = response.send(Ok(BlockAccessLists(access_lists)));
}

View File

@@ -190,6 +190,16 @@ impl<N: NetworkPrimitives> NetworkHandle<N> {
pub fn secret_key(&self) -> &SecretKey {
&self.inner.secret_key
}
/// Returns the [`Discv4`] handle if discv4 is enabled.
pub fn discv4(&self) -> Option<&Discv4> {
self.inner.discv4.as_ref()
}
/// Returns the [`Discv5`] handle if discv5 is enabled.
pub fn discv5(&self) -> Option<&Discv5> {
self.inner.discv5.as_ref()
}
}
// === API Implementations ===
@@ -315,7 +325,7 @@ impl<N: NetworkPrimitives> Peers for NetworkHandle<N> {
fn add_peer_kind(
&self,
peer: PeerId,
kind: PeerKind,
kind: Option<PeerKind>,
tcp_addr: SocketAddr,
udp_addr: Option<SocketAddr>,
) {
@@ -516,7 +526,7 @@ pub(crate) enum NetworkHandleMessage<N: NetworkPrimitives = EthNetworkPrimitives
/// Marks a peer as trusted.
AddTrustedPeerId(PeerId),
/// Adds an address for a peer, including its ID, kind, and socket address.
AddPeerAddress(PeerId, PeerKind, PeerAddr),
AddPeerAddress(PeerId, Option<PeerKind>, PeerAddr),
/// Removes a peer from the peerset corresponding to the given kind.
RemovePeer(PeerId, PeerKind),
/// Disconnects a connection to a peer if it exists, optionally providing a disconnect reason.

View File

@@ -308,8 +308,13 @@ impl<N: NetworkPrimitives> NetworkState<N> {
}
/// Adds a peer and its address with the given kind to the peerset.
pub(crate) fn add_peer_kind(&mut self, peer_id: PeerId, kind: PeerKind, addr: PeerAddr) {
self.peers_manager.add_peer_kind(peer_id, Some(kind), addr, None)
pub(crate) fn add_peer_kind(
&mut self,
peer_id: PeerId,
kind: Option<PeerKind>,
addr: PeerAddr,
) {
self.peers_manager.add_peer_kind(peer_id, kind, addr, None)
}
/// Connects a peer and its address with the given kind

View File

@@ -195,7 +195,7 @@ impl LaunchContext {
should_save = true;
}
} else if !reth_config.prune.is_default() {
warn!(target: "reth::cli", "Pruning configuration is present in the config file, but no CLI arguments are provided. Using config from file.");
info!(target: "reth::cli", "Pruning configuration is present in the config file, but no CLI arguments are provided. Using config from file.");
}
if should_save {

View File

@@ -79,6 +79,13 @@ where
Self::Right(r) => r.withdrawals(),
}
}
fn slot_number(&self) -> Option<u64> {
match self {
Self::Left(l) => l.slot_number(),
Self::Right(r) => r.slot_number(),
}
}
}
/// this structure enables the chaining of multiple `PayloadBuilder` implementations,

View File

@@ -72,7 +72,7 @@
//! },
//! ..Default::default()
//! };
//! let payload = EthBuiltPayload::new(Arc::new(SealedBlock::seal_slow(block)), U256::ZERO, None);
//! let payload = EthBuiltPayload::new(Arc::new(SealedBlock::seal_slow(block)), U256::ZERO, None, None);
//! Ok(payload)
//! }
//!

View File

@@ -89,6 +89,7 @@ impl PayloadJob for TestPayloadJob {
Arc::new(Block::<_>::default().seal_slow()),
U256::ZERO,
Some(Default::default()),
None,
))
}

View File

@@ -126,6 +126,28 @@ pub enum VersionSpecificValidationError {
/// root after Cancun
#[error("no parent beacon block root post-cancun")]
NoParentBeaconBlockRootPostCancun,
/// Thrown if the pre-V6 `PayloadAttributes` or `ExecutionPayload` contains a block access list
#[error("block access list not before V6")]
BlockAccessListNotSupportedBeforeV6,
/// Thrown if `engine_newPayload` contains no block access list
/// after Amsterdam
#[error("no block access list post-Amsterdam")]
NoBlockAccessListPostAmsterdam,
/// Thrown if `engine_newPayload` contains block access list
/// before Amsterdam
#[error("block access list pre-Amsterdam")]
HasBlockAccessListPreAmsterdam,
/// Thrown if the pre-V6 `PayloadAttributes` or `ExecutionPayload` contains a slot number
#[error("slot number not before V6")]
SlotNumberNotSupportedBeforeV6,
/// Thrown if `engine_newPayload` contains no slot number
/// after Amsterdam
#[error("no slot number post-Amsterdam")]
NoSlotNumberPostAmsterdam,
/// Thrown if `engine_newPayload` contains slot number
/// before Amsterdam
#[error("slot number pre-Amsterdam")]
HasSlotNumberPreAmsterdam,
}
/// Error validating payload received over `newPayload` API.

View File

@@ -156,6 +156,97 @@ pub fn validate_payload_timestamp(
return Err(EngineObjectValidationError::UnsupportedFork)
}
let is_amsterdam = chain_spec.is_amsterdam_active_at_timestamp(timestamp);
if version.is_v6() && !is_amsterdam {
// From the Engine API spec:
// <https://github.com/ethereum/execution-apis/blob/15399c2e2f16a5f800bf3f285640357e2c245ad9/src/engine/osaka.md#specification>
//
// For `engine_getPayloadV6`
//
// 1. Client software MUST return -38005: Unsupported fork error if the timestamp of the
// built payload does not fall within the time frame of the Amsterdam fork.
return Err(EngineObjectValidationError::UnsupportedFork)
}
Ok(())
}
/// Validates the presence of the `block access lists` field according to the payload timestamp.
/// After Amsterdam, block access list field must be [Some].
/// Before Amsterdam, block access list field must be [None];
pub fn validate_block_access_list_presence<T: EthereumHardforks>(
chain_spec: &T,
version: EngineApiMessageVersion,
message_validation_kind: MessageValidationKind,
timestamp: u64,
has_block_access_list: bool,
) -> Result<(), EngineObjectValidationError> {
let is_amsterdam_active = chain_spec.is_amsterdam_active_at_timestamp(timestamp);
match version {
EngineApiMessageVersion::V1 |
EngineApiMessageVersion::V2 |
EngineApiMessageVersion::V3 |
EngineApiMessageVersion::V4 |
EngineApiMessageVersion::V5 => {
if has_block_access_list {
return Err(message_validation_kind
.to_error(VersionSpecificValidationError::BlockAccessListNotSupportedBeforeV6))
}
}
EngineApiMessageVersion::V6 => {
if is_amsterdam_active && !has_block_access_list {
return Err(message_validation_kind
.to_error(VersionSpecificValidationError::NoBlockAccessListPostAmsterdam))
}
if !is_amsterdam_active && has_block_access_list {
return Err(message_validation_kind
.to_error(VersionSpecificValidationError::HasBlockAccessListPreAmsterdam))
}
}
};
Ok(())
}
/// Validates the presence of the `slot number` field according to the payload timestamp.
/// After Amsterdam, slot number field must be [Some].
/// Before Amsterdam, slot number field must be [None];
pub fn validate_slot_number_presence<T: EthereumHardforks>(
chain_spec: &T,
version: EngineApiMessageVersion,
message_validation_kind: MessageValidationKind,
timestamp: u64,
has_slot_number: bool,
) -> Result<(), EngineObjectValidationError> {
let is_amsterdam_active = chain_spec.is_amsterdam_active_at_timestamp(timestamp);
match version {
EngineApiMessageVersion::V1 |
EngineApiMessageVersion::V2 |
EngineApiMessageVersion::V3 |
EngineApiMessageVersion::V4 |
EngineApiMessageVersion::V5 => {
if has_slot_number {
return Err(message_validation_kind
.to_error(VersionSpecificValidationError::SlotNumberNotSupportedBeforeV6))
}
}
EngineApiMessageVersion::V6 => {
if is_amsterdam_active && !has_slot_number {
return Err(message_validation_kind
.to_error(VersionSpecificValidationError::NoSlotNumberPostAmsterdam))
}
if !is_amsterdam_active && has_slot_number {
return Err(message_validation_kind
.to_error(VersionSpecificValidationError::HasSlotNumberPreAmsterdam))
}
}
};
Ok(())
}
@@ -359,6 +450,25 @@ where
Type: PayloadAttributes,
T: EthereumHardforks,
{
// BAL only exists in ExecutionPayload, not PayloadAttributes (EIP-7928)
if let PayloadOrAttributes::ExecutionPayload(_) = payload_or_attrs {
validate_block_access_list_presence(
chain_spec,
version,
payload_or_attrs.message_validation_kind(),
payload_or_attrs.timestamp(),
payload_or_attrs.block_access_list().is_some(),
)?;
}
validate_slot_number_presence(
chain_spec,
version,
payload_or_attrs.message_validation_kind(),
payload_or_attrs.timestamp(),
payload_or_attrs.slot_number().is_some(),
)?;
validate_withdrawals_presence(
chain_spec,
version,

View File

@@ -56,8 +56,15 @@ pub trait ExecutionPayload:
/// Returns the total gas consumed by all transactions in this block.
fn gas_used(&self) -> u64;
/// Returns the total gas limit for this block.
fn gas_limit(&self) -> u64;
/// Returns the number of transactions in the payload.
fn transaction_count(&self) -> usize;
/// Returns the slot number included in this payload.
///
/// Returns `None` for pre-Amsterdam blocks.
fn slot_number(&self) -> Option<u64>;
}
impl ExecutionPayload for ExecutionData {
@@ -78,7 +85,7 @@ impl ExecutionPayload for ExecutionData {
}
fn block_access_list(&self) -> Option<&Bytes> {
None
self.payload.block_access_list()
}
fn parent_beacon_block_root(&self) -> Option<B256> {
@@ -93,9 +100,17 @@ impl ExecutionPayload for ExecutionData {
self.payload.as_v1().gas_used
}
fn gas_limit(&self) -> u64 {
self.payload.as_v1().gas_limit
}
fn transaction_count(&self) -> usize {
self.payload.as_v1().transactions.len()
}
fn slot_number(&self) -> Option<u64> {
self.payload.slot_number()
}
}
/// A unified type for handling both execution payloads and payload attributes.
@@ -158,6 +173,22 @@ where
Self::PayloadAttributes(_) => MessageValidationKind::PayloadAttributes,
}
}
/// Returns `block_access_list` from payload.
pub fn block_access_list(&self) -> Option<&Bytes> {
match self {
Self::ExecutionPayload(payload) => payload.block_access_list(),
Self::PayloadAttributes(_attributes) => None,
}
}
/// Returns `slot_number` from payload or attributes.
pub fn slot_number(&self) -> Option<u64> {
match self {
Self::ExecutionPayload(payload) => payload.slot_number(),
Self::PayloadAttributes(attributes) => attributes.slot_number(),
}
}
}
impl<'a, Payload, AttributesType> From<&'a AttributesType>

View File

@@ -117,6 +117,11 @@ pub trait PayloadAttributes:
///
/// `Some` for post-merge blocks, `None` for pre-merge blocks.
fn parent_beacon_block_root(&self) -> Option<B256>;
/// Returns the slot number for the new payload.
///
/// `Some` for post-Amsterdam blocks, `None` for earlier blocks.
fn slot_number(&self) -> Option<u64>;
}
impl PayloadAttributes for EthPayloadAttributes {
@@ -135,6 +140,10 @@ impl PayloadAttributes for EthPayloadAttributes {
fn parent_beacon_block_root(&self) -> Option<B256> {
self.parent_beacon_block_root
}
fn slot_number(&self) -> Option<u64> {
self.slot_number
}
}
/// Factory trait for creating payload attributes.

View File

@@ -20,7 +20,6 @@ reth-trie-common = { workspace = true, features = ["serde"] }
reth-chain-state.workspace = true
# ethereum
alloy-eip7928 = { workspace = true, features = ["serde"] }
alloy-eips.workspace = true
alloy-json-rpc.workspace = true
alloy-primitives.workspace = true

View File

@@ -1,4 +1,3 @@
use alloy_eip7928::BlockAccessList;
use alloy_eips::{BlockId, BlockNumberOrTag};
use alloy_genesis::ChainConfig;
use alloy_json_rpc::RpcObject;
@@ -163,10 +162,6 @@ pub trait DebugApi<TxReq: RpcObject> {
mode: Option<ExecutionWitnessMode>,
) -> RpcResult<ExecutionWitness>;
/// Re-executes a block and returns the Block Access List (BAL) as defined in EIP-7928.
#[method(name = "getBlockAccessList")]
async fn debug_get_block_access_list(&self, block_id: BlockId) -> RpcResult<BlockAccessList>;
/// Sets the logging backtrace location. When a backtrace location is set and a log message is
/// emitted at that location, the stack of the goroutine executing the log statement will
/// be printed to stderr.

View File

@@ -43,8 +43,8 @@ pub enum RethNewPayloadInput<ExecutionData> {
/// Reth-specific engine API extensions.
///
/// This trait provides a `reth_newPayload` endpoint that accepts either `ExecutionData` directly
/// (payload + sidecar) or an RLP-encoded block, optionally waiting for persistence and cache locks
/// before processing.
/// (payload + sidecar) or an RLP-encoded block, optionally alongside a block access list and
/// waiting for persistence and cache locks before processing.
///
/// By default, the endpoint waits for both in-flight persistence and cache updates to complete
/// before executing the payload, providing unbiased timing measurements. Each can be independently

View File

@@ -37,6 +37,14 @@ async fn test_eth_subscribe_all_supported_kinds_accept() {
serde_json::json!({"topics": ["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"]}),
],
),
("transactionReceipts", vec![]),
("transactionReceipts", vec![serde_json::json!({"transactionHashes": []})]),
(
"transactionReceipts",
vec![
serde_json::json!({"transactionHashes": ["0x5c504ed432cb51138bcf09aa5e8a410dd4a1e204ef84bfed1be16dfba1b22060"]}),
],
),
];
for (kind, params) in cases {

View File

@@ -17,16 +17,19 @@ pub const CAPABILITIES: &[&str] = &[
"engine_forkchoiceUpdatedV1",
"engine_forkchoiceUpdatedV2",
"engine_forkchoiceUpdatedV3",
"engine_forkchoiceUpdatedV4",
"engine_getClientVersionV1",
"engine_getPayloadV1",
"engine_getPayloadV2",
"engine_getPayloadV3",
"engine_getPayloadV4",
"engine_getPayloadV5",
"engine_getPayloadV6",
"engine_newPayloadV1",
"engine_newPayloadV2",
"engine_newPayloadV3",
"engine_newPayloadV4",
"engine_newPayloadV5",
"engine_getPayloadBodiesByHashV1",
"engine_getPayloadBodiesByHashV2",
"engine_getPayloadBodiesByRangeV1",

View File

@@ -253,6 +253,38 @@ where
Ok(res?)
}
/// Handler for `engine_newPayloadV5`
///
/// Post-Amsterdam payload handler.
///
/// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/amsterdam.md#engine_newpayloadv5>
pub async fn new_payload_v5(
&self,
payload: PayloadT::ExecutionData,
) -> EngineApiResult<PayloadStatus> {
let payload_or_attrs = PayloadOrAttributes::<
'_,
PayloadT::ExecutionData,
PayloadT::PayloadAttributes,
>::from_execution_payload(&payload);
self.inner
.validator
.validate_version_specific_fields(EngineApiMessageVersion::V6, payload_or_attrs)?;
Ok(self.inner.beacon_consensus.new_payload(payload).await?)
}
/// Metrics version of `new_payload_v5`
pub async fn new_payload_v5_metered(
&self,
payload: PayloadT::ExecutionData,
) -> RpcResult<PayloadStatus> {
let start = Instant::now();
let res = Self::new_payload_v5(self, payload).await;
let elapsed = start.elapsed();
self.inner.metrics.latency.new_payload_v5.record(elapsed);
Ok(res?)
}
/// Returns whether the engine accepts execution requests hash.
pub fn accept_execution_requests_hash(&self) -> bool {
self.inner.accept_execution_requests_hash
@@ -542,6 +574,29 @@ where
res
}
/// Handler for `engine_getPayloadV6`
///
/// Post-Amsterdam payload handler that includes Block Access Lists (BAL).
///
/// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/amsterdam.md#engine_getpayloadv6>
pub async fn get_payload_v6(
&self,
payload_id: PayloadId,
) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV6> {
self.get_payload_inner(payload_id, EngineApiMessageVersion::V6).await
}
/// Metrics version of `get_payload_v6`
pub async fn get_payload_v6_metered(
&self,
payload_id: PayloadId,
) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV6> {
let start = Instant::now();
let res = Self::get_payload_v6(self, payload_id).await;
self.inner.metrics.latency.get_payload_v6.record(start.elapsed());
res
}
/// Fetches all the blocks for the provided range starting at `start`, containing `count`
/// blocks and returns the mapped payload bodies.
pub async fn get_payload_bodies_by_range_with<F, R>(
@@ -1038,20 +1093,31 @@ where
/// Handler for `engine_newPayloadV5`
///
/// Post Amsterdam payload handler. Currently returns unsupported fork error.
/// Post Amsterdam payload handler.
///
/// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/amsterdam.md#engine_newpayloadv5>
async fn new_payload_v5(
&self,
_payload: ExecutionPayloadV4,
_versioned_hashes: Vec<B256>,
_parent_beacon_block_root: B256,
_execution_requests: RequestsOrHash,
payload: ExecutionPayloadV4,
versioned_hashes: Vec<B256>,
parent_beacon_block_root: B256,
requests: RequestsOrHash,
) -> RpcResult<PayloadStatus> {
trace!(target: "rpc::engine", "Serving engine_newPayloadV5");
Err(EngineApiError::EngineObjectValidationError(
reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
))?
// Accept requests as a hash only if it is explicitly allowed.
if requests.is_hash() && !self.inner.accept_execution_requests_hash {
return Err(EngineApiError::UnexpectedRequestsHash.into());
}
let payload = ExecutionData {
payload: payload.into(),
sidecar: ExecutionPayloadSidecar::v4(
CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
PraguePayloadFields { requests },
),
};
Ok(self.new_payload_v5_metered(payload).await?)
}
/// Handler for `engine_forkchoiceUpdatedV1`
@@ -1196,12 +1262,10 @@ where
/// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/amsterdam.md#engine_getpayloadv6>
async fn get_payload_v6(
&self,
_payload_id: PayloadId,
payload_id: PayloadId,
) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV6> {
trace!(target: "rpc::engine", "Serving engine_getPayloadV6");
Err(EngineApiError::EngineObjectValidationError(
reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
))?
Ok(self.get_payload_v6_metered(payload_id).await?)
}
/// Handler for `engine_getPayloadBodiesByHashV1`

View File

@@ -120,7 +120,13 @@ impl From<EngineApiError> for jsonrpsee_types::error::ErrorObject<'static> {
EngineObjectValidationError::PayloadAttributes(
VersionSpecificValidationError::WithdrawalsNotSupportedInV1 |
VersionSpecificValidationError::NoWithdrawalsPostShanghai |
VersionSpecificValidationError::HasWithdrawalsPreShanghai,
VersionSpecificValidationError::HasWithdrawalsPreShanghai |
VersionSpecificValidationError::BlockAccessListNotSupportedBeforeV6 |
VersionSpecificValidationError::HasBlockAccessListPreAmsterdam |
VersionSpecificValidationError::NoBlockAccessListPostAmsterdam |
VersionSpecificValidationError::HasSlotNumberPreAmsterdam |
VersionSpecificValidationError::NoSlotNumberPostAmsterdam |
VersionSpecificValidationError::SlotNumberNotSupportedBeforeV6,
),
) |
EngineApiError::UnexpectedRequestsHash => {

View File

@@ -22,6 +22,8 @@ pub(crate) struct EngineApiLatencyMetrics {
pub(crate) new_payload_v3: Histogram,
/// Latency for `engine_newPayloadV4`
pub(crate) new_payload_v4: Histogram,
/// Latency for `engine_newPayloadV5`
pub(crate) new_payload_v5: Histogram,
/// Latency for `engine_forkchoiceUpdatedV1`
pub(crate) fork_choice_updated_v1: Histogram,
/// Latency for `engine_forkchoiceUpdatedV2`
@@ -40,6 +42,8 @@ pub(crate) struct EngineApiLatencyMetrics {
pub(crate) get_payload_v4: Histogram,
/// Latency for `engine_getPayloadV5`
pub(crate) get_payload_v5: Histogram,
/// Latency for `engine_getPayloadV6`
pub(crate) get_payload_v6: Histogram,
/// Latency for `engine_getPayloadBodiesByRangeV1`
pub(crate) get_payload_bodies_by_range_v1: Histogram,
/// Latency for `engine_getPayloadBodiesByRangeV2`

View File

@@ -517,7 +517,7 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
let result = this.inspect(&mut db, evm_env.clone(), tx_env.clone(), &mut inspector)?;
let access_list = inspector.into_access_list();
let gas_used = result.result.gas_used();
let gas_used = result.result.tx_gas_used();
tx_env.set_access_list(access_list.clone());
if let Err(err) = Self::Error::ensure_success(result.result) {
return Ok(AccessListResult {
@@ -529,7 +529,7 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
// transact again to get the exact gas used
let result = this.transact(&mut db, evm_env, tx_env)?;
let gas_used = result.result.gas_used();
let gas_used = result.result.tx_gas_used();
let error = Self::Error::ensure_success(result.result).err().map(|e| e.to_string());
Ok(AccessListResult { access_list, gas_used: U256::from(gas_used), error })

View File

@@ -204,7 +204,7 @@ pub trait EstimateCall: Call {
// NOTE: this is the gas the transaction used, which is less than the
// transaction requires to succeed.
let mut gas_used = res.result.gas_used();
let mut gas_used = res.result.tx_gas_used();
// the lowest value is capped by the gas used by the unconstrained transaction
let mut lowest_gas_limit = gas_used.saturating_sub(1);
@@ -225,7 +225,7 @@ pub trait EstimateCall: Call {
res = evm.transact(optimistic_tx_env).map_err(Self::Error::from_evm_err)?;
// Update the gas used based on the new result.
gas_used = res.result.gas_used();
gas_used = res.result.tx_gas_used();
// Update the gas limit estimates (highest and lowest) based on the execution result.
update_estimated_gas_range(
res.result,

View File

@@ -435,6 +435,7 @@ impl<H: BlockHeader> BuildPendingEnv<H> for NextBlockEnvAttributes {
parent_beacon_block_root: parent.parent_beacon_block_root(),
withdrawals: parent.withdrawals_root().map(|_| Default::default()),
extra_data: parent.extra_data().clone(),
slot_number: parent.slot_number().map(|slot| slot.saturating_add(1)),
}
}
}
@@ -457,4 +458,14 @@ mod tests {
assert_eq!(attrs.parent_beacon_block_root, Some(beacon_root));
}
#[test]
fn pending_env_increments_parent_slot_number() {
let header = Header { slot_number: Some(7), ..Default::default() };
let sealed = SealedHeader::new(header, B256::ZERO);
let attrs = NextBlockEnvAttributes::build_pending_env(&sealed);
assert_eq!(attrs.slot_number, Some(8));
}
}

View File

@@ -126,7 +126,7 @@ pub trait FromEvmError<Evm: ConfigureEvm>:
ExecutionResult::Success { output, .. } => Ok(output.into_data()),
ExecutionResult::Revert { output, .. } => Err(Self::from_revert(output)),
ExecutionResult::Halt { reason, gas, .. } => {
Err(Self::from_evm_halt(reason, gas.used()))
Err(Self::from_evm_halt(reason, gas.tx_gas_used()))
}
}
}

View File

@@ -559,6 +559,7 @@ where
EVMError::Header(err) => err.into(),
EVMError::Database(err) => err.into(),
EVMError::Custom(err) => Self::EvmCustom(err),
EVMError::CustomAny(err) => Self::EvmCustom(err.to_string()),
}
}
}

View File

@@ -314,7 +314,7 @@ where
code: SIMULATE_VM_ERROR_CODE,
..SimulateError::invalid_params()
}),
gas_used: gas.used(),
gas_used: gas.tx_gas_used(),
logs: Vec::new(),
status: false,
..Default::default()
@@ -329,7 +329,7 @@ where
code: SIMULATE_REVERT_CODE,
..SimulateError::invalid_params()
}),
gas_used: gas.used(),
gas_used: gas.tx_gas_used(),
status: false,
logs: Vec::new(),
..Default::default()
@@ -338,7 +338,7 @@ where
ExecutionResult::Success { output, gas, logs, .. } => SimCallResult {
return_data: output.into_data(),
error: None,
gas_used: gas.used(),
gas_used: gas.tx_gas_used(),
logs: logs
.into_iter()
.map(|log| {

View File

@@ -45,7 +45,6 @@ reth-node-api.workspace = true
reth-trie-common.workspace = true
# ethereum
alloy-eip7928.workspace = true
alloy-evm = { workspace = true, features = ["overrides"] }
alloy-consensus.workspace = true
alloy-signer.workspace = true

View File

@@ -1,5 +1,4 @@
use alloy_consensus::{transaction::TxHashRef, BlockHeader};
use alloy_eip7928::BlockAccessList;
use alloy_eips::{eip2718::Encodable2718, BlockId, BlockNumberOrTag};
use alloy_evm::{env::BlockEnvironment, Evm};
use alloy_genesis::ChainConfig;
@@ -829,10 +828,6 @@ where
Self::debug_execution_witness_by_block_hash(self, hash, mode).await.map_err(Into::into)
}
async fn debug_get_block_access_list(&self, _block_id: BlockId) -> RpcResult<BlockAccessList> {
Err(internal_rpc_err("unimplemented"))
}
async fn debug_backtrace_at(&self, _location: &str) -> RpcResult<()> {
Ok(())
}

View File

@@ -188,7 +188,7 @@ where
let gas_price = tx
.effective_tip_per_gas(basefee)
.expect("fee is always valid; execution succeeded");
let gas_used = result.gas_used();
let gas_used = result.tx_gas_used();
total_gas_used += gas_used;
let gas_fees = U256::from(gas_used) * U256::from(gas_price);

View File

@@ -2,9 +2,12 @@
use std::sync::Arc;
use alloy_consensus::{transaction::TxHashRef, BlockHeader, TxReceipt};
use alloy_primitives::TxHash;
use alloy_rpc_types_eth::{
pubsub::{Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata},
pubsub::{
Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata, TransactionReceiptsParams,
},
Filter, Log,
};
use futures::StreamExt;
@@ -13,7 +16,8 @@ use jsonrpsee::{
};
use reth_chain_state::CanonStateSubscriptions;
use reth_network_api::NetworkInfo;
use reth_rpc_convert::RpcHeader;
use reth_primitives_traits::TransactionMeta;
use reth_rpc_convert::{transaction::ConvertReceiptInput, RpcHeader};
use reth_rpc_eth_api::{
pubsub::EthPubSubApiServer, EthApiTypes, RpcConvert, RpcNodeCore, RpcTransaction,
};
@@ -185,10 +189,97 @@ where
Ok(())
}
_ => {
// TODO: implement once https://github.com/alloy-rs/alloy/pull/3410 is released
Err(invalid_params_rpc_err("Unsupported subscription kind"))
SubscriptionKind::TransactionReceipts => {
let filter = match params {
Some(Params::TransactionReceipts(filter)) => filter,
None | Some(Params::None) => TransactionReceiptsParams::default(),
_ => {
return Err(invalid_params_rpc_err("Invalid params for transactionReceipts"))
}
};
let converter = self.inner.eth_api.converter();
let stream = self.inner.eth_api.provider().canonical_state_stream().flat_map(
move |new_chain| {
// for each block in the new chain, build RPC receipts
let results: Vec<_> = new_chain
.committed()
.blocks_and_receipts()
.filter_map(|(block, receipts)| {
let block_hash = block.hash();
let block_number = block.number();
let base_fee = block.base_fee_per_gas();
let excess_blob_gas = block.excess_blob_gas();
let timestamp = block.timestamp();
let mut gas_used: u64 = 0;
let mut next_log_index: usize = 0;
// build ConvertReceiptInput for each tx+receipt pair
// (same logic as eth_getBlockReceipts HTTP endpoint)
let inputs: Vec<_> = block
.transactions_recovered()
.zip(receipts.iter())
.enumerate()
.filter_map(|(idx, (tx, receipt))| {
let gas_used_before = gas_used;
let next_log_index_before = next_log_index;
let cumulative_gas_used = receipt.cumulative_gas_used();
gas_used = cumulative_gas_used;
next_log_index += receipt.logs().len();
// apply transaction hash filter if provided
let matches = match &filter.transaction_hashes {
Some(hashes) if !hashes.is_empty() => {
hashes.contains(tx.tx_hash())
}
_ => true,
};
matches.then(|| ConvertReceiptInput {
tx,
gas_used: cumulative_gas_used - gas_used_before,
next_log_index: next_log_index_before,
meta: TransactionMeta {
tx_hash: *tx.tx_hash(),
index: idx as u64,
block_hash,
block_number,
base_fee,
excess_blob_gas,
timestamp,
},
receipt: receipt.clone(),
})
})
.collect();
if inputs.is_empty() {
return None;
}
match converter.convert_receipts(inputs) {
Ok(rpc_receipts) => Some(rpc_receipts),
Err(err) => {
error!(
target = "rpc",
%err,
"Failed to convert receipts"
);
None
}
}
})
.collect();
futures::stream::iter(results)
},
);
pipe_from_stream(accepted_sink, stream).await
}
_ => Err(invalid_params_rpc_err("Unsupported subscription kind")),
}
}
}
@@ -356,10 +447,10 @@ where
/// Returns a stream that yields all logs that match the given filter.
fn log_stream(&self, filter: Filter) -> impl Stream<Item = Log> {
BroadcastStream::new(self.eth_api.provider().subscribe_to_canonical_state())
.map(move |canon_state| {
canon_state.expect("new block subscription never ends").block_receipts()
})
self.eth_api
.provider()
.canonical_state_stream()
.map(move |canon_state| canon_state.block_receipts())
.flat_map(futures::stream::iter)
.flat_map(move |(block_receipts, removed)| {
let all_logs = logs_utils::matching_block_logs_with_tx_hashes(

View File

@@ -348,7 +348,7 @@ where
.into());
}
let gas_used = result.gas_used();
let gas_used = result.tx_gas_used();
total_gas_used += gas_used;
// coinbase is always present in the result state

View File

@@ -119,6 +119,7 @@ where
parent_beacon_block_root: request.payload_attributes.parent_beacon_block_root,
withdrawals: withdrawals.map(Into::into),
extra_data: request.extra_data.unwrap_or_default(),
slot_number: request.payload_attributes.slot_number,
};
let mut builder = evm_config
@@ -212,7 +213,7 @@ where
let requests = has_requests.then_some(outcome.execution_result.requests);
EthBuiltPayload::new(sealed_block, total_fees, requests)
EthBuiltPayload::new(sealed_block, total_fees, requests, None)
.try_into_v5()
.map_err(RethError::other)
.map_err(Eth::Error::from_eth_err)

View File

@@ -3,13 +3,23 @@
use alloy_consensus::BlockHeader;
use alloy_genesis::GenesisAccount;
use alloy_primitives::{
map::{AddressMap, B256Map, HashMap},
keccak256,
map::{AddressMap, B256Map, B256Set, HashMap},
Address, B256, U256,
};
use reth_chainspec::EthChainSpec;
use reth_codecs::Compact;
use reth_config::config::EtlConfig;
use reth_db_api::{tables, transaction::DbTxMut, DatabaseError};
use reth_db_api::{
cursor::{DbCursorRW, DbDupCursorRW},
models::{
storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress, IntegerList,
ShardedKey,
},
tables,
transaction::DbTxMut,
DatabaseError,
};
use reth_etl::Collector;
use reth_execution_errors::StateRootError;
use reth_primitives_traits::{
@@ -54,6 +64,11 @@ pub const DEFAULT_SOFT_LIMIT_BYTE_LEN_ACCOUNTS_CHUNK: usize = 1_000_000_000;
/// Soft limit for the number of flushed updates after which to log progress summary.
const SOFT_LIMIT_COUNT_FLUSHED_UPDATES: usize = 1_000_000;
/// Max number of storage "units" (1 per account + 1 per storage slot) before committing
/// the current MDBX transaction and opening a new one. This bounds dirty page accumulation
/// and prevents OOM on large state imports.
const STORAGE_COMMIT_THRESHOLD: usize = 500_000;
/// Storage initialization error type.
#[derive(Debug, thiserror::Error, Clone)]
pub enum InitStorageError {
@@ -460,43 +475,54 @@ where
/// It's similar to [`init_genesis`] but supports importing state too big to fit in memory, and can
/// be set to the highest block present. One practical usecase is to import OP mainnet state at
/// bedrock transition block.
pub fn init_from_state_dump<Provider>(
pub fn init_from_state_dump<PF>(
mut reader: impl BufRead,
provider_rw: &Provider,
provider_factory: &PF,
etl_config: EtlConfig,
) -> eyre::Result<B256>
where
Provider: StaticFileProviderFactory
+ DBProvider<Tx: DbTxMut>
+ BlockNumReader
+ BlockHashReader
+ ChainSpecProvider
+ StageCheckpointWriter
+ HistoryWriter
+ HeaderProvider
+ HashingWriter
+ TrieWriter
+ StateWriter
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider
+ AsRef<Provider>,
PF: DatabaseProviderFactory<
ProviderRW: StaticFileProviderFactory
+ DBProvider<Tx: DbTxMut>
+ BlockNumReader
+ BlockHashReader
+ ChainSpecProvider
+ StageCheckpointWriter
+ HistoryWriter
+ HeaderProvider
+ HashingWriter
+ TrieWriter
+ StateWriter
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider
+ AsRef<PF::ProviderRW>,
>,
{
if etl_config.file_size == 0 {
return Err(eyre::eyre!("ETL file size cannot be zero"))
}
let block = provider_rw.last_block_number()?;
let (block, hash, expected_state_root) = {
let provider_rw = provider_factory.database_provider_rw()?;
let block = provider_rw.last_block_number()?;
let hash = provider_rw
.block_hash(block)?
.ok_or_else(|| eyre::eyre!("Block hash not found for block {}", block))?;
let header = provider_rw
.header_by_number(block)?
.map(|h| SealedHeader::new(h, hash))
.ok_or_else(|| ProviderError::HeaderNotFound(block.into()))?;
let state_root = header.state_root();
let hash = provider_rw
.block_hash(block)?
.ok_or_else(|| eyre::eyre!("Block hash not found for block {}", block))?;
let header = provider_rw
.header_by_number(block)?
.map(|h| SealedHeader::new(h, hash))
.ok_or_else(|| ProviderError::HeaderNotFound(block.into()))?;
debug!(target: "reth::cli",
block,
chain=%provider_rw.chain_spec().chain(),
"Initializing state at block"
);
let expected_state_root = header.state_root();
(block, hash, state_root)
};
// first line can be state root
let dump_state_root = parse_state_root(&mut reader)?;
@@ -504,7 +530,6 @@ where
error!(target: "reth::cli",
?dump_state_root,
?expected_state_root,
header=?header.num_hash(),
"State root from state dump does not match state root in current header."
);
return Err(InitStorageError::StateRootMismatch(GotExpected {
@@ -514,26 +539,24 @@ where
.into())
}
debug!(target: "reth::cli",
block,
chain=%provider_rw.chain_spec().chain(),
"Initializing state at block"
);
// remaining lines are accounts
let collector = parse_accounts(&mut reader, etl_config)?;
// write state to db
dump_state(collector, provider_rw, block)?;
// write state to db with chunked commits to avoid OOM
dump_state(collector, provider_factory, block)?;
info!(target: "reth::cli", "All accounts written to database, starting state root computation (may take some time)");
// clear trie tables so state root is computed from scratch
provider_rw.tx_ref().clear::<tables::AccountsTrie>()?;
provider_rw.tx_ref().clear::<tables::StoragesTrie>()?;
{
let provider_rw = provider_factory.database_provider_rw()?;
provider_rw.tx_ref().clear::<tables::AccountsTrie>()?;
provider_rw.tx_ref().clear::<tables::StoragesTrie>()?;
provider_rw.commit()?;
}
// compute and compare state root. this advances the stage checkpoints.
let computed_state_root = compute_state_root(provider_rw, None)?;
// compute and compare state root
let computed_state_root = compute_state_root_chunked(provider_factory)?;
if computed_state_root == expected_state_root {
info!(target: "reth::cli",
?computed_state_root,
@@ -554,8 +577,12 @@ where
}
// insert sync stages for stages that require state
for stage in StageId::STATE_REQUIRED {
provider_rw.save_stage_checkpoint(stage, StageCheckpoint::new(block))?;
{
let provider_rw = provider_factory.database_provider_rw()?;
for stage in StageId::STATE_REQUIRED {
provider_rw.save_stage_checkpoint(stage, StageCheckpoint::new(block))?;
}
provider_rw.commit()?;
}
Ok(hash)
@@ -597,70 +624,170 @@ fn parse_accounts(
Ok(collector)
}
/// Takes a [`Collector`] and processes all accounts.
fn dump_state<Provider>(
/// Takes a [`Collector`] and writes all accounts directly to database tables.
///
/// This bypasses the higher-level `insert_state`/`insert_genesis_hashes`/`insert_history`
/// functions which build intermediate structures (`BundleStateInit`, `RevertsInit`,
/// `ExecutionOutcome`) that duplicate all storage data 2-3x in memory. For accounts with
/// millions of storage entries this causes OOM.
///
/// Instead, each account is written directly to all required tables using cursor operations,
/// using `append`/`append_dup` for sorted tables where possible (MDBX fast path that skips
/// B-tree traversal). Commits happen every [`STORAGE_COMMIT_THRESHOLD`] storage units to
/// bound MDBX dirty page accumulation.
///
/// NOTE: This function is not idempotent. If the process crashes mid-import, the database
/// must be wiped before retrying.
fn dump_state<PF>(
mut collector: Collector<Address, GenesisAccount>,
provider_rw: &Provider,
provider_factory: &PF,
block: u64,
) -> Result<(), eyre::Error>
where
Provider: StaticFileProviderFactory
+ DBProvider<Tx: DbTxMut>
+ HeaderProvider
+ HashingWriter
+ HistoryWriter
+ StateWriter
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider
+ AsRef<Provider>,
PF: DatabaseProviderFactory<ProviderRW: DBProvider<Tx: DbTxMut>>,
{
let accounts_len = collector.len();
let mut accounts = Vec::new();
let mut total_inserted_accounts = 0;
let mut chunk_byte_size = 0;
let mut total_accounts: usize = 0;
let mut storage_units: usize = 0;
for (index, entry) in collector.iter()?.enumerate() {
let (address, account) = entry?;
chunk_byte_size += address.len() + account.len();
let (address, _) = Address::from_compact(address.as_slice(), address.len());
let (account, _) = GenesisAccount::from_compact(account.as_slice(), account.len());
// pre-allocate the history list once — every entry uses the same single-block bitmap
let history_list = IntegerList::new([block])?;
accounts.push((address, account));
// track seen bytecode hashes to avoid re-hashing and re-writing duplicates
let mut seen_bytecodes: B256Set = B256Set::default();
if chunk_byte_size >= DEFAULT_SOFT_LIMIT_BYTE_LEN_ACCOUNTS_CHUNK ||
index == accounts_len - 1
{
total_inserted_accounts += accounts.len();
let mut provider_rw = provider_factory.database_provider_rw()?;
for entry in collector.iter()? {
let (address_raw, account_raw) = entry?;
let (address, _) = Address::from_compact(address_raw.as_slice(), address_raw.len());
let (account, _) = GenesisAccount::from_compact(account_raw.as_slice(), account_raw.len());
let account_storage_len = account.storage.as_ref().map_or(0, |s| s.len());
let account_units = 1 + account_storage_len;
// commit before this account would push us over the threshold
if storage_units > 0 && storage_units + account_units > STORAGE_COMMIT_THRESHOLD {
provider_rw.commit()?;
provider_rw = provider_factory.database_provider_rw()?;
info!(target: "reth::cli",
total_inserted_accounts,
"Writing accounts to db"
total_accounts,
accounts_len,
storage_units,
"Committed chunk"
);
storage_units = 0;
}
// use transaction to insert genesis header
insert_genesis_hashes(
provider_rw,
accounts.iter().map(|(address, account)| (address, account)),
)?;
write_account_to_db(
provider_rw.tx_ref(),
&address,
&account,
block,
&history_list,
&mut seen_bytecodes,
)?;
insert_history(
provider_rw,
accounts.iter().map(|(address, account)| (address, account)),
block,
)?;
total_accounts += 1;
storage_units += account_units;
// block is already written to static files
insert_state(
provider_rw,
accounts.iter().map(|(address, account)| (address, account)),
block,
)?;
accounts.clear();
chunk_byte_size = 0;
if total_accounts.is_multiple_of(100_000) {
info!(target: "reth::cli", total_accounts, accounts_len, "Writing accounts...");
}
}
// commit final batch
provider_rw.commit()?;
info!(target: "reth::cli", total_accounts, "All accounts written to database");
Ok(())
}
/// Writes a single account and all its storage to every required DB table directly,
/// without building intermediary structures.
///
/// Uses `append_dup` for `DupSort` tables where insertion order matches key order (the ETL
/// collector sorts by address, so `AccountChangeSets`, `PlainStorageState`, and
/// `StorageChangeSets` receive data in sorted order within each account). For `HashedAccounts`
/// and `HashedStorages`, insertion order is unsorted (keccak scrambles address order), so we
/// use `put`/`upsert` which do a full B-tree lookup.
fn write_account_to_db<TX: DbTxMut>(
tx: &TX,
address: &Address,
genesis_account: &GenesisAccount,
block: u64,
history_list: &IntegerList,
seen_bytecodes: &mut B256Set,
) -> Result<(), eyre::Error> {
let bytecode_hash = if let Some(code) = &genesis_account.code {
let bytecode = Bytecode::new_raw_checked(code.clone())
.map_err(|e| eyre::eyre!("Invalid bytecode for {address}: {e}"))?;
let hash = bytecode.hash_slow();
if seen_bytecodes.insert(hash) {
tx.put::<tables::Bytecodes>(hash, bytecode)?;
}
Some(hash)
} else {
None
};
let account = Account {
nonce: genesis_account.nonce.unwrap_or_default(),
balance: genesis_account.balance,
bytecode_hash,
};
let hashed_address = keccak256(address);
// plain state — sorted by address (ETL order), use append
tx.put::<tables::PlainAccountState>(*address, account)?;
// hashed state — unsorted (keccak scrambles order), must use put
tx.put::<tables::HashedAccounts>(hashed_address, account)?;
// account changeset — DupSort keyed by block, subkey sorted by address (ETL order)
let mut acct_cs_cursor = tx.cursor_dup_write::<tables::AccountChangeSets>()?;
acct_cs_cursor.append_dup(block, AccountBeforeTx { address: *address, info: None })?;
// account history
tx.put::<tables::AccountsHistory>(ShardedKey::new(*address, u64::MAX), history_list.clone())?;
// storage entries
if let Some(storage) = &genesis_account.storage {
let mut hashed_storage_cursor = tx.cursor_dup_write::<tables::HashedStorages>()?;
let mut plain_storage_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
let mut storage_cs_cursor = tx.cursor_dup_write::<tables::StorageChangeSets>()?;
// sort storage slots by key so we can use append_dup for plain/changeset tables
let mut sorted_slots: Vec<_> = storage.iter().collect();
sorted_slots.sort_unstable_by_key(|(k, _)| *k);
for &(&key, &value) in &sorted_slots {
let value_u256 = U256::from_be_bytes(value.0);
// plain storage — sorted by (address, key), use append_dup
plain_storage_cursor.append_dup(*address, StorageEntry { key, value: value_u256 })?;
// hashed storage — unsorted keccak order, use upsert
let hashed_key = keccak256(key);
hashed_storage_cursor
.upsert(hashed_address, &StorageEntry { key: hashed_key, value: value_u256 })?;
// storage changeset — sorted by (block, address), then by key via append_dup
storage_cs_cursor.append_dup(
BlockNumberAddress((block, *address)),
StorageEntry { key, value: U256::ZERO },
)?;
// storage history
tx.put::<tables::StoragesHistory>(
StorageShardedKey::new(*address, key, u64::MAX),
history_list.clone(),
)?;
}
}
Ok(())
}
@@ -738,6 +865,81 @@ where
}
}
/// Computes the state root (from scratch) with periodic commits to free MDBX dirty pages.
///
/// Opens a fresh transaction each iteration to release dirty pages, preventing OOM on large
/// states where trie updates accumulate gigabytes of MDBX dirty pages.
fn compute_state_root_chunked<PF>(provider_factory: &PF) -> Result<B256, InitStorageError>
where
PF: DatabaseProviderFactory<
ProviderRW: DBProvider<Tx: DbTxMut> + TrieWriter + StorageSettingsCache,
>,
{
let provider_rw = provider_factory.database_provider_rw().map_err(provider_db_err)?;
reth_trie_db::with_adapter!(&provider_rw, |A| {
drop(provider_rw);
compute_state_root_chunked_inner::<PF, A>(provider_factory)
})
}
fn compute_state_root_chunked_inner<PF, A>(provider_factory: &PF) -> Result<B256, InitStorageError>
where
PF: DatabaseProviderFactory<
ProviderRW: DBProvider<Tx: DbTxMut> + TrieWriter + StorageSettingsCache,
>,
A: reth_trie_db::TrieTableAdapter,
{
trace!(target: "reth::cli", "Computing state root");
let mut intermediate_state: Option<IntermediateStateRootState> = None;
let mut total_flushed_updates = 0;
loop {
let provider_rw = provider_factory.database_provider_rw().map_err(provider_db_err)?;
let tx = provider_rw.tx_ref();
let state_root =
DbStateRoot::<_, A>::from_tx(tx).with_intermediate_state(intermediate_state.take());
match state_root.root_with_progress()? {
StateRootProgress::Progress(state, _, updates) => {
let updated_len = provider_rw.write_trie_updates(updates)?;
total_flushed_updates += updated_len;
info!(target: "reth::cli",
last_account_key = %state.account_root_state.last_hashed_key,
updated_len,
total_flushed_updates,
"Flushing trie updates (committing to free memory)"
);
intermediate_state = Some(*state);
provider_rw.commit().map_err(provider_db_err)?;
}
StateRootProgress::Complete(root, _, updates) => {
let updated_len = provider_rw.write_trie_updates(updates)?;
total_flushed_updates += updated_len;
info!(target: "reth::cli",
%root,
updated_len,
total_flushed_updates,
"State root computation complete"
);
provider_rw.commit().map_err(provider_db_err)?;
return Ok(root)
}
}
}
}
/// Converts a provider error into an [`InitStorageError`].
fn provider_db_err(e: impl std::fmt::Display) -> InitStorageError {
InitStorageError::from(StateRootError::Database(DatabaseError::Other(e.to_string())))
}
/// Type to deserialize state root from state dump file.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
struct StateRoot {

View File

@@ -45,6 +45,7 @@ pub fn increase_thread_priority() {
/// Should be called once after tracing is initialized.
///
/// No-op on non-Linux platforms.
#[allow(clippy::missing_const_for_fn)]
pub fn deprioritize_background_threads() {
#[cfg(target_os = "linux")]
_deprioritize_background_threads();

View File

@@ -12,7 +12,7 @@ exclude.workspace = true
# obs
opentelemetry_sdk = { workspace = true, optional = true }
opentelemetry = { workspace = true, optional = true }
opentelemetry-otlp = { workspace = true, optional = true, features = ["grpc-tonic"] }
opentelemetry-otlp = { workspace = true, optional = true, features = ["grpc-tonic", "reqwest-rustls"] }
opentelemetry-semantic-conventions = { workspace = true, optional = true }
opentelemetry-appender-tracing = { workspace = true, optional = true }
tracing-opentelemetry = { workspace = true, optional = true }

View File

@@ -1414,7 +1414,7 @@ pub fn ensure_intrinsic_gas<T: EthPoolTransaction>(
);
let gas_limit = transaction.gas_limit();
if gas_limit < gas.initial_gas || gas_limit < gas.floor_gas {
if gas_limit < gas.initial_total_gas || gas_limit < gas.floor_gas {
Err(InvalidPoolTransactionError::IntrinsicGasTooLow)
} else {
Ok(())

View File

@@ -1,7 +1,7 @@
use super::TrieCursor;
use crate::{BranchNodeCompact, Nibbles};
use reth_storage_errors::db::DatabaseError;
use std::cmp::Ordering;
use std::{cmp::Ordering, iter::FusedIterator};
use tracing::trace;
/// Compares two Nibbles in depth-first order.
@@ -118,12 +118,15 @@ impl<C: TrieCursor> Iterator for DepthFirstTrieIterator<C> {
}
if let Err(err) = self.fill_next() {
self.complete = true;
return Some(Err(err))
}
}
}
}
impl<C: TrieCursor> FusedIterator for DepthFirstTrieIterator<C> {}
#[cfg(test)]
mod tests {
use super::*;
@@ -131,6 +134,35 @@ mod tests {
use alloy_trie::TrieMask;
use std::{collections::BTreeMap, sync::Arc};
/// A trie cursor that always returns an error on seek/next.
struct FailingTrieCursor;
impl TrieCursor for FailingTrieCursor {
fn seek_exact(
&mut self,
_key: Nibbles,
) -> Result<Option<(Nibbles, BranchNodeCompact)>, DatabaseError> {
Err(DatabaseError::Other("test error".to_string()))
}
fn seek(
&mut self,
_key: Nibbles,
) -> Result<Option<(Nibbles, BranchNodeCompact)>, DatabaseError> {
Err(DatabaseError::Other("test error".to_string()))
}
fn next(&mut self) -> Result<Option<(Nibbles, BranchNodeCompact)>, DatabaseError> {
Err(DatabaseError::Other("test error".to_string()))
}
fn current(&mut self) -> Result<Option<Nibbles>, DatabaseError> {
Err(DatabaseError::Other("test error".to_string()))
}
fn reset(&mut self) {}
}
fn create_test_node(state_nibbles: &[u8], tree_nibbles: &[u8]) -> BranchNodeCompact {
let mut state_mask = TrieMask::default();
for &nibble in state_nibbles {
@@ -371,4 +403,14 @@ mod tests {
assert_eq!(actual_order, expected_order);
}
#[test]
fn test_iterator_terminates_on_error() {
let mut iter = DepthFirstTrieIterator::new(FailingTrieCursor);
// First call should return the error.
assert!(iter.next().unwrap().is_err());
// Iterator must be fused after the error — no infinite error loop.
assert!(iter.next().is_none());
}
}

View File

@@ -144,13 +144,31 @@ Storage:
Local path to a snapshot manifest.json for modular component downloads
--with-txs
Include transaction static files
Include all transaction static files
--with-txs-since <BLOCK_NUMBER>
Include transaction static files starting at the specified block
--with-txs-distance <BLOCKS>
Include transaction static files covering the last N blocks
--with-receipts
Include receipt static files
Include all receipt static files
--with-receipts-since <BLOCK_NUMBER>
Include receipt static files starting at the specified block
--with-receipts-distance <BLOCKS>
Include receipt static files covering the last N blocks
--with-state-history
Include account and storage history static files
Include all account and storage history static files
--with-state-history-since <BLOCK_NUMBER>
Include account and storage history static files starting at the specified block
--with-state-history-distance <BLOCKS>
Include account and storage history static files covering the last N blocks
--with-senders
Include transaction sender static files. Requires `--with-txs`

View File

@@ -5,7 +5,7 @@
use alloy_eips::eip4895::Withdrawal;
use alloy_evm::{
block::{BlockExecutorFactory, BlockExecutorFor, ExecutableTx},
block::{BlockExecutorFactory, BlockExecutorFor, ExecutableTx, GasOutput},
eth::{EthBlockExecutionCtx, EthBlockExecutor, EthTxResult},
precompiles::PrecompilesMap,
revm::context::Block as _,
@@ -211,7 +211,10 @@ where
self.inner.execute_transaction_without_commit(tx)
}
fn commit_transaction(&mut self, output: Self::Result) -> Result<u64, BlockExecutionError> {
fn commit_transaction(
&mut self,
output: Self::Result,
) -> Result<GasOutput, BlockExecutionError> {
self.inner.commit_transaction(output)
}

View File

@@ -97,6 +97,10 @@ impl PayloadAttributes for CustomPayloadAttributes {
fn parent_beacon_block_root(&self) -> Option<B256> {
self.inner.parent_beacon_block_root()
}
fn slot_number(&self) -> Option<u64> {
self.inner.slot_number()
}
}
/// Custom engine types - uses a custom payload attributes RPC type, but uses the default

View File

@@ -22,7 +22,7 @@ use reth_ethereum::{
context_interface::result::{EVMError, HaltReason},
inspector::{Inspector, NoOpInspector},
interpreter::interpreter::EthInterpreter,
precompile::{PrecompileOutput, PrecompileResult, Precompiles},
precompile::{PrecompileOutput, Precompiles},
primitives::hardfork::SpecId,
MainBuilder, MainContext,
},
@@ -110,7 +110,7 @@ pub fn prague_custom() -> &'static Precompiles {
let precompile = Precompile::new(
PrecompileId::custom("custom"),
address!("0x0000000000000000000000000000000000000999"),
|_, _| PrecompileResult::Ok(PrecompileOutput::new(0, Bytes::new())),
|_, _, _| Ok(PrecompileOutput::new(0, Bytes::new(), 0)),
);
precompiles.extend([precompile]);
precompiles

View File

@@ -252,7 +252,7 @@ fn run_case(case: &BlockchainTest) -> Result<(), Error> {
.map_err(|err| Error::block_failed(block_number, err))?;
// Consensus checks after block execution
validate_block_post_execution(block, &chain_spec, &output.receipts, &output.requests, None)
validate_block_post_execution(block, &chain_spec, &output, None)
.map_err(|err| Error::block_failed(block_number, err))?;
// Compute and check the post state root