Compare commits

..

62 Commits

Author SHA1 Message Date
yongkangc
815efc5927 feat(payload): prototype cached payload builder using engine-tree caches
Adds EthereumPayloadBuilder2 that demonstrates using the engine-tree's
three caches (execution cache, precompile cache, sparse trie) inside the
payload builder for faster block building.

Changes:
- New builder2.rs with EthereumPayloadBuilder2 and cached_ethereum_payload()
- Make payload_processor types public (SharedPreservedSparseTrie,
  PreservedSparseTrie, PreservedTrieGuard, SparseTrie, PayloadExecutionCache)
- SparseTrieStateProvider wrapper delegating all StateProvider traits
- Sparse trie state root is a stub (falls through to standard computation)
2026-03-19 06:16:58 +00:00
Derek Cofausper
dd51a75a78 chore(ci): remove pull_request trigger from bench-scheduled (#23105)
Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com>
2026-03-19 00:46:24 +00:00
Ayush Baluni
a14db7f0ca fix(net): disable Discv5 ENR auto-update when NAT disabled or explicit addr set (#23075) 2026-03-18 21:47:31 +00:00
Derek Cofausper
c91845ae44 feat(prune): make minimum pruning distance configurable (#23082)
Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com>
2026-03-18 21:07:43 +00:00
Derek Cofausper
f61098ec00 fix(provider): gate rocksdb jemalloc behind feature flag (#23061)
Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com>
2026-03-18 18:53:30 +00:00
Tim
240fcf164e feat: add nightly bench runs (#23095) 2026-03-18 18:26:50 +00:00
Derek Cofausper
365b6274da ci(bench): add otlp toggle argument (#23092)
Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com>
2026-03-18 17:14:52 +00:00
Sergei Shulepov
ab90477ed6 fix(trie): another branch collapse edge-case (#23089)
Co-authored-by: Amp <amp@ampcode.com>
2026-03-18 17:09:16 +00:00
Arsenii Kulikov
2778a063ad fix: use zero gas price for empty blocks (#23094) 2026-03-18 17:04:50 +00:00
Dan Cline
a83d5453bd fix(provider): fix race between save_blocks and rocksdb pruning (#23081) 2026-03-18 16:58:14 +00:00
Chase Wright
ce1d091ad2 fix(ethstats): Re-enable TLS in tokio-tungstenite (#23090)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-03-18 16:57:13 +00:00
AKABABA-ETH
10b1b4522c fix(p2p): apply sessions config from reth.toml in p2p subcommand (#23078) 2026-03-18 12:36:26 +00:00
Derek Cofausper
7bf9241fe6 fix(provider): disable read transaction timeout during check_consistency (#23083)
Co-authored-by: Arsenii Kulikov <62447812+klkvr@users.noreply.github.com>
Co-authored-by: Alexey Shekhirin <github@shekhirin.com>
2026-03-18 12:07:25 +00:00
Artyom Bakhtin
1a0e982ead fix(metrics): Rename more instances of invalid save_blocks_block_count (#22915)
Signed-off-by: bakhtin <a@bakhtin.net>
2026-03-18 11:59:24 +00:00
Nicolas SSS
d148f39cca refactor(chainspec): remove unused once_cell_set utility (#23043) 2026-03-18 11:54:16 +00:00
Crypto Nomad
7c53936634 fix(rpc): export EthConfigApi in aggregate modules (#23068) 2026-03-18 13:12:34 +01:00
stevencartavia
a9ff59fc64 perf(rpc): avoid request clone in eth_createAccessList (#23085) 2026-03-18 10:58:19 +00:00
stevencartavia
5de969a1be perf(rpc): avoid cloning tx in pending block builder (#23077) 2026-03-18 09:45:44 +00:00
Derek Cofausper
ae2c916f61 refactor(storage): use RocksReadSnapshot for read-only compatible RocksDB reads (#23067)
Co-authored-by: Tim <12827757+laibe@users.noreply.github.com>
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-03-17 18:03:08 +00:00
Brian Picciano
6097cf9ee7 fix(trie): Fix branch collapse edge-cases in ArenaParallelSparseTrie (#23053)
Signed-off-by: Delweng <delweng@gmail.com>
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: stevencartavia <112043913+stevencartavia@users.noreply.github.com>
Co-authored-by: Derek Cofausper <256792747+decofe@users.noreply.github.com>
Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com>
Co-authored-by: MagicJoshh <subhshubham398@gmail.com>
Co-authored-by: Delweng <delweng@gmail.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-merge-queue <118344674+github-merge-queue@users.noreply.github.com>
Co-authored-by: Huber <HuberyJulianay@gmail.com>
Co-authored-by: Sergei Shulepov <2205845+pepyakin@users.noreply.github.com>
Co-authored-by: Olivier Dupont <olivierdupontvier@gmail.com>
Co-authored-by: YK <chiayongkang@hotmail.com>
Co-authored-by: Crypto Nomad <cryptonomadkripto@gmail.com>
Co-authored-by: ligt <me@ligt.dev>
Co-authored-by: Sergei Shulepov <pep@tempo.xyz>
2026-03-17 17:10:23 +00:00
stevencartavia
75fa61377a perf(rpc): avoid redundant next_env_attributes call in simulate_v1 (#23064) 2026-03-17 16:15:55 +00:00
Derek Cofausper
de3033d285 fix(provider): add ensure_canonical_block guard to history_by_block_hash (#22876)
Co-authored-by: Matthias Seitz <19890894+mattsse@users.noreply.github.com>
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
2026-03-17 16:11:32 +00:00
Delweng
55ed7d5bb5 perf(engine): check hashmap instead of clone (#23071)
Signed-off-by: Delweng <delweng@gmail.com>
2026-03-17 14:00:45 +00:00
Derek Cofausper
a0b0d8854c fix(storage): preserve genesis history entries in RocksDB consistency check (#23033)
Co-authored-by: Arsenii Kulikov <62447812+klkvr@users.noreply.github.com>
Co-authored-by: Arsenii Kulikov <klkvrr@gmail.com>
2026-03-17 12:35:04 +00:00
Brian Picciano
5e744326a4 feat(trie): proof_v2 prefix set support (#22946)
Co-authored-by: Amp <amp@ampcode.com>
2026-03-17 12:03:25 +00:00
Delweng
0aff4cc8da fix(net): treat malformed blob sidecar responses as peer misbehavior (#23035)
Signed-off-by: Delweng <delweng@gmail.com>
2026-03-17 10:59:50 +00:00
theo
58142d5e16 chore: remove op-revm dep (#23059) 2026-03-17 10:33:41 +00:00
Derek Cofausper
b7eb508484 feat(fs-util): add remove_file_if_exists helper (#23065)
Co-authored-by: Matthias Seitz <19890894+mattsse@users.noreply.github.com>
2026-03-17 10:32:53 +00:00
MagicJoshh
d8ae156f64 fix(rpc): export Client traits instead of Server in clients module (#23058) 2026-03-17 09:43:43 +00:00
Brian Picciano
35dc30561f perf(trie): call update_subtrie_hashes after every update (#23052) 2026-03-16 17:16:05 +00:00
ligt
5e1e994d11 chore(engine-tree): simplify return type of canonical_block_by_hash (#23048) 2026-03-16 11:56:42 +00:00
Crypto Nomad
ce850c4fc3 fix(rpc): clone EthSigner trait objects with generic tx request (#23050) 2026-03-16 11:11:55 +00:00
Olivier Dupont
89bc38be1c fix(rpc): remove redundant TransportRpcModuleConfig clone in builder (#22945)
Co-authored-by: YK <chiayongkang@hotmail.com>
2026-03-16 10:22:26 +00:00
Derek Cofausper
acdbd065e2 chore(bench): add rich job summary matching Slack output (#23046)
Co-authored-by: Sergei Shulepov <2205845+pepyakin@users.noreply.github.com>
2026-03-16 10:12:35 +00:00
Huber
62f48893a9 fix(p2p): respect --bootnodes flag in reth p2p commands (#23040) 2026-03-15 08:51:54 +00:00
github-actions[bot]
5ef6620060 chore(deps): weekly cargo update (#23041)
Co-authored-by: github-merge-queue <118344674+github-merge-queue@users.noreply.github.com>
2026-03-15 08:45:07 +00:00
Delweng
d3d7fb31d7 fix(txpool): use ceiling division for replacement tx price bump check (#23012)
Signed-off-by: Delweng <delweng@gmail.com>
2026-03-14 07:56:10 +00:00
Delweng
93cb8934ea fix(net): fully remove disconnected peers from transaction state (#23014)
Signed-off-by: Delweng <delweng@gmail.com>
2026-03-14 04:25:53 +00:00
MagicJoshh
a20d1fb1ef fix(rpc): disable fee charge in eth_createAccessList (#23026) 2026-03-14 02:11:16 +00:00
Derek Cofausper
2178b44224 ci(bench): schedule bench job only on runners tagged available (#23027)
Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com>
2026-03-13 17:33:04 +00:00
stevencartavia
46a6ee49e1 perf(rpc): avoid hash_slow in reward traces (#23011) 2026-03-13 16:08:45 +00:00
Brian Picciano
a047de9200 chore(grafana): update State Root Task dashboard panels (#23020) 2026-03-13 13:48:52 +00:00
Rej Ect
5f9810c01b fix(chain-state): correct return type of NewCanonicalChain::tip() (#23018)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-03-13 11:30:43 +00:00
Matthias Seitz
792ee9245f fix(pool): prevent sender-id map growth on read-only sender+nonce lookups (#23008)
Co-authored-by: theobhau183919-ux <theobhau183919@gmail.com>
2026-03-13 11:28:11 +00:00
figtracer
035b021837 chore(docker): bump lighthouse to v8.1.2 (#23002)
Co-authored-by: YK <chiayongkang@hotmail.com>
2026-03-13 11:28:10 +00:00
Matthias Seitz
b05a689c46 fix(net): gate serde-only imports behind feature flag (#23010) 2026-03-13 11:16:14 +00:00
MagicJoshh
2baacf93a3 fix(rpc): eth_config returns wrong fork (#23007)
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
2026-03-13 10:03:55 +00:00
Derek Cofausper
0d8d48a16e ci: bump state tests runner to depot-ubuntu-latest-8 (#23017)
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
2026-03-13 09:08:15 +00:00
Derek Cofausper
26f0e59155 ci: disable PGO by default, rename input to pgo (#23016)
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
2026-03-13 09:00:53 +00:00
Abhijit Roy
3b75817086 fix(primitives): enable serde for RPC receipt test in reth-ethereum-primitives (#22983)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-03-13 08:37:55 +01:00
stevencartavia
9fdafb70f5 perf: avoid redundant seal_slow when hash is known (#23009) 2026-03-13 08:36:29 +01:00
Delweng
28e067432a fix(net): send disconnect on invalid inbound eth messages (#22986)
Signed-off-by: Delweng <delweng@gmail.com>
2026-03-12 22:27:53 +00:00
Derek Cofausper
c73274cc82 chore(bench): limit reth memory to 95% of available RAM (#23005)
Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com>
2026-03-12 22:21:56 +00:00
Alexey Shekhirin
9060c5059e ci(bench): push OTLP traces and logs to VictoriaTraces/VictoriaLogs (#22999) 2026-03-12 17:47:30 +00:00
Derek Cofausper
b9969c5b1c chore: remove rocksdb and edge feature gates, default to storage v2 (#22954)
Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com>
2026-03-12 16:59:18 +00:00
Dan Cline
b37b881074 feat(node-builder): add with_rocksdb_provider to NodeBuilder (#22970)
Co-authored-by: Arsenii Kulikov <klkvrr@gmail.com>
2026-03-12 16:42:59 +00:00
Brian Picciano
9b53c4fa39 chore(trie): address arena PR review feedback (#22996)
Co-authored-by: Amp <amp@ampcode.com>
2026-03-12 16:04:02 +00:00
Derek Cofausper
6cd0f843a8 fix(rpc): disable fee charge for eth_estimateGas (#22959)
Co-authored-by: Arsenii Kulikov <62447812+klkvr@users.noreply.github.com>
Co-authored-by: Arsenii Kulikov <klkvrr@gmail.com>
2026-03-12 15:58:07 +00:00
Sergei Shulepov
47f5653a55 fix(bench): guard abba run steps on BENCH_ABBA flag (#22981) 2026-03-12 15:50:40 +00:00
Derek Cofausper
c0f6997352 feat(bench): show baseline/feature CLI args in Slack notification (#22997)
Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com>
2026-03-12 15:29:57 +00:00
Derek Cofausper
6a62c38498 ci(docker): add disable_pgo input for workflow dispatch (#22960)
Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com>
Co-authored-by: Alexey Shekhirin <github@shekhirin.com>
2026-03-12 14:12:27 +00:00
Derek Cofausper
294e215077 fix(provider): heal finalized/safe block numbers ahead of highest header (#22995)
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
2026-03-12 13:55:48 +00:00
137 changed files with 5435 additions and 2850 deletions

View File

@@ -0,0 +1,6 @@
---
reth-trie-common: minor
reth-trie: minor
---
Added `contains_range` method to `PrefixSet` for checking if any key falls within a half-open range. Added prefix set support to `ProofCalculator` via `with_prefix_set`, enabling stale cached hash invalidation and branch collapse detection when keys are inserted or removed; propagated storage prefix sets through `SyncAccountValueEncoder`.

View File

@@ -0,0 +1,5 @@
---
reth-trie-sparse: patch
---
Refactored arena trie internals by adding a `BranchChildIdx::sibling()` helper, deduplicating `Index`/`NodeArena` type aliases, and replacing `is_empty()` with a `drop_root()` method. Fixed a bug where `cursor.pop()` was called before checking if the leaf was the root node, which could cause incorrect dirty-state propagation.

View File

@@ -0,0 +1,5 @@
---
reth-trie-sparse: minor
---
Fixed a bug in `ArenaParallelSparseTrie` where subtrie updates that would completely empty a subtrie were incorrectly dispatched to parallel workers instead of being processed inline, preventing correct branch collapse detection when blinded siblings are present. Refactored the `SparseTrie` test suite to accept a `fn() -> T` factory instead of requiring `T: Default`, enabling a new `arena_parallel_sparse_trie_always_parallel` test variant that exercises all tests with parallelism thresholds set to 1. Added `test_branch_collapse_multi_empty_subtries_blinded_remaining` to cover the case where removing multiple revealed leaves empties their subtries and leaves a single blinded sibling requiring a proof.

View File

@@ -0,0 +1,5 @@
---
reth-trie-sparse: patch
---
Fixed another branch collapse edge case where `check_subtrie_collapse_needs_proof` incorrectly compared removal count against total update count (including `Touched` entries), causing it to skip proof requests for blinded siblings and panic when the subtrie emptied. Added a regression test covering the removals + `Touched` + blinded sibling scenario.

View File

@@ -0,0 +1,5 @@
---
reth-engine-tree: patch
---
Added idle-time pre-computation of account trie upper hashes in the sparse trie payload processor when no pending proof results are available.

View File

@@ -5,3 +5,4 @@ self-hosted-runner:
- depot-ubuntu-latest-4
- depot-ubuntu-latest-8
- depot-ubuntu-latest-16
- available

106
.github/scripts/bench-job-summary.js vendored Normal file
View File

@@ -0,0 +1,106 @@
// Generates a rich GitHub Actions job summary for reth-bench results.
//
// Reads from environment:
// BENCH_WORK_DIR Directory containing summary.json
// BENCH_PR PR number (may be empty)
// BENCH_ACTOR GitHub user who triggered the bench
// BENCH_CORES CPU core limit (0 = all)
// BENCH_WARMUP_BLOCKS Number of warmup blocks
// BENCH_SAMPLY 'true' if samply profiling was enabled
// BENCH_ABBA 'true' if ABBA interleaved order was used
//
// Usage from actions/github-script:
// const jobSummary = require('./.github/scripts/bench-job-summary.js');
// await jobSummary({ core, context, chartSha, grafanaUrl, runId });
const fs = require('fs');
const { verdict, loadSamplyUrls, blocksLabel, metricRows, waitTimeRows } = require('./bench-utils');
module.exports = async function ({ core, context, chartSha, grafanaUrl, runId }) {
let summary;
try {
summary = JSON.parse(fs.readFileSync(process.env.BENCH_WORK_DIR + '/summary.json', 'utf8'));
} catch (e) {
await core.summary.addRaw('⚠️ Benchmark completed but failed to load summary.').write();
return;
}
const repo = `${context.repo.owner}/${context.repo.repo}`;
const prNumber = process.env.BENCH_PR;
const actor = process.env.BENCH_ACTOR;
const commitUrl = `https://github.com/${repo}/commit`;
const { emoji, label } = verdict(summary.changes);
const baselineLink = `[\`${summary.baseline.name}\`](${commitUrl}/${summary.baseline.ref})`;
const featureLink = `[\`${summary.feature.name}\`](${commitUrl}/${summary.feature.ref})`;
const diffUrl = `https://github.com/${repo}/compare/${summary.baseline.ref}...${summary.feature.ref}`;
// Header & metadata
const metaParts = [];
if (prNumber) metaParts.push(`**[PR #${prNumber}](https://github.com/${repo}/pull/${prNumber})**`);
metaParts.push(`triggered by @${actor}`);
let md = `# ${emoji} ${label}\n\n`;
md += metaParts.join(' · ') + '\n\n';
md += `**Baseline:** ${baselineLink}\n`;
md += `**Feature:** ${featureLink} ([diff](${diffUrl}))\n`;
md += blocksLabel(summary).map(p => `**${p.key}:** ${p.value}`).join(' · ') + '\n\n';
// Main comparison table
const rows = metricRows(summary);
md += `| Metric | Baseline | Feature | Change |\n`;
md += `|--------|----------|---------|--------|\n`;
for (const r of rows) {
md += `| ${r.label} | ${r.baseline} | ${r.feature} | ${r.change} |\n`;
}
md += '\n';
// Wait time breakdown
const wtRows = waitTimeRows(summary);
if (wtRows.length > 0) {
md += `### Wait Time Breakdown\n\n`;
md += `| Metric | Baseline | Feature |\n`;
md += `|--------|----------|--------|\n`;
for (const r of wtRows) {
md += `| ${r.title} | ${r.baseline} | ${r.feature} |\n`;
}
md += '\n';
}
// Charts
if (chartSha) {
const prNum = prNumber || '0';
const baseUrl = `https://raw.githubusercontent.com/decofe/reth-bench-charts/${chartSha}/pr/${prNum}/${runId}`;
const charts = [
{ file: 'latency_throughput.png', label: 'Latency, Throughput & Diff' },
{ file: 'wait_breakdown.png', label: 'Wait Time Breakdown' },
{ file: 'gas_vs_latency.png', label: 'Gas vs Latency' },
];
md += `### Charts\n\n`;
for (const chart of charts) {
md += `<details><summary>${chart.label}</summary>\n\n`;
md += `![${chart.label}](${baseUrl}/${chart.file})\n\n`;
md += `</details>\n\n`;
}
}
// Samply profiles
const samplyUrls = loadSamplyUrls(process.env.BENCH_WORK_DIR);
const samplyLinks = Object.entries(samplyUrls).map(([run, url]) => `- **${run}**: [Firefox Profiler](${url})`);
if (samplyLinks.length > 0) {
md += `### Samply Profiles\n\n${samplyLinks.join('\n')}\n\n`;
}
// Grafana
if (grafanaUrl) {
md += `### Grafana Dashboard\n\n[View real-time metrics](${grafanaUrl})\n\n`;
}
// Node errors
try {
const errors = fs.readFileSync(process.env.BENCH_WORK_DIR + '/errors.md', 'utf8');
if (errors.trim()) md += '\n' + errors + '\n';
} catch {}
await core.summary.addRaw(md).write();
};

View File

@@ -11,6 +11,8 @@
# BENCH_WAIT_TIME (duration like 500ms, default empty)
# BENCH_BASELINE_ARGS (extra reth node args for baseline runs)
# BENCH_FEATURE_ARGS (extra reth node args for feature runs)
# BENCH_OTLP_TRACES_ENDPOINT (OTLP HTTP endpoint for traces, e.g. https://host/insert/opentelemetry/v1/traces)
# BENCH_OTLP_LOGS_ENDPOINT (OTLP HTTP endpoint for logs, e.g. https://host/insert/opentelemetry/v1/logs)
set -euo pipefail
LABEL="$1"
@@ -139,6 +141,14 @@ if [ -n "${BENCH_METRICS_ADDR:-}" ]; then
RETH_ARGS+=(--metrics "$BENCH_METRICS_ADDR")
fi
# OTLP traces and logs export
if [ -n "${BENCH_OTLP_TRACES_ENDPOINT:-}" ]; then
RETH_ARGS+=(--tracing-otlp="${BENCH_OTLP_TRACES_ENDPOINT}" --tracing-otlp.service-name=reth-bench)
fi
if [ -n "${BENCH_OTLP_LOGS_ENDPOINT:-}" ]; then
RETH_ARGS+=(--logs-otlp="${BENCH_OTLP_LOGS_ENDPOINT}" --logs-otlp.filter=debug)
fi
# Tracy profiling: add --log.tracy flags and set environment
if [ "${BENCH_TRACY:-off}" != "off" ]; then
RETH_ARGS+=(--log.tracy --log.tracy.filter "${BENCH_TRACY_FILTER:-debug}")
@@ -149,16 +159,29 @@ if [ "${BENCH_TRACY:-off}" != "off" ]; then
fi
fi
SUDO_ENV=()
if [ -n "${OTEL_RESOURCE_ATTRIBUTES:-}" ]; then
SUDO_ENV+=("OTEL_RESOURCE_ATTRIBUTES=${OTEL_RESOURCE_ATTRIBUTES}")
SUDO_ENV+=("OTEL_BSP_MAX_QUEUE_SIZE=65536" "OTEL_BLRP_MAX_QUEUE_SIZE=65536")
fi
# Limit reth memory to 95% of available RAM to prevent OOM kills
TOTAL_MEM_KB=$(awk '/^MemTotal:/ {print $2}' /proc/meminfo)
MEM_LIMIT=$(( TOTAL_MEM_KB * 95 / 100 * 1024 ))
echo "Memory limit: $(( MEM_LIMIT / 1024 / 1024 ))MB (95% of $(( TOTAL_MEM_KB / 1024 ))MB)"
if [ "${BENCH_SAMPLY:-false}" = "true" ]; then
RETH_ARGS+=(--log.samply)
SAMPLY="$(which samply)"
sudo taskset -c "$RETH_CPUS" nice -n -20 \
sudo systemd-run --scope -p MemoryMax="$MEM_LIMIT" -p AllowedCPUs="$RETH_CPUS" \
env "${SUDO_ENV[@]}" nice -n -20 \
"$SAMPLY" record --save-only --presymbolicate --rate 10000 \
--output "$OUTPUT_DIR/samply-profile.json.gz" \
-- "$BINARY" "${RETH_ARGS[@]}" \
> "$LOG" 2>&1 &
else
sudo taskset -c "$RETH_CPUS" nice -n -20 "$BINARY" "${RETH_ARGS[@]}" \
sudo systemd-run --scope -p MemoryMax="$MEM_LIMIT" -p AllowedCPUs="$RETH_CPUS" \
env "${SUDO_ENV[@]}" nice -n -20 "$BINARY" "${RETH_ARGS[@]}" \
> "$LOG" 2>&1 &
fi

132
.github/scripts/bench-scheduled-refs.sh vendored Executable file
View File

@@ -0,0 +1,132 @@
#!/usr/bin/env bash
#
# Resolves baseline and feature refs for nightly regression benchmark runs.
#
# Queries the latest successful scheduled docker.yml run via GitHub API
# to find the commit that built the nightly Docker image. Compares with
# the last successful feature ref (from GH Actions cache) to determine
# baseline, detect staleness, and decide whether to skip.
#
# Usage: bench-nightly-refs.sh [--force]
#
# Outputs (via GITHUB_OUTPUT):
# baseline-ref — commit SHA for baseline
# feature-ref — commit SHA for feature (current nightly)
# should-skip — "true" if no new nightly since last run
# is-stale — "true" if latest nightly build is >24h old
# stale-age-hours — age of the nightly build in hours (only if stale)
# nightly-created — ISO timestamp of the nightly build
#
# Reads:
# .nightly-state/last-feature-ref (from GH Actions cache, may not exist)
#
# Requires: gh (GitHub CLI), jq, date
set -euo pipefail
FORCE="${1:-false}"
REPO="${GITHUB_REPOSITORY:-paradigmxyz/reth}"
# --- Step 1: Query latest successful scheduled docker.yml run ---
echo "::group::Querying latest nightly docker build"
RUNS_JSON=$(gh run list \
-R "$REPO" \
--workflow=docker.yml \
--event=schedule \
--status=completed \
--limit 5 \
--json headSha,createdAt,conclusion)
# Find the most recent successful run
LATEST=$(echo "$RUNS_JSON" | jq -r '[.[] | select(.conclusion == "success")] | first // empty')
if [ -z "$LATEST" ]; then
echo "::error::No successful scheduled docker.yml run found in the last 5 runs"
echo "Runs found: $RUNS_JSON"
exit 1
fi
FEATURE_REF=$(echo "$LATEST" | jq -r '.headSha')
CREATED_AT=$(echo "$LATEST" | jq -r '.createdAt')
echo "Latest nightly commit: $FEATURE_REF"
echo "Built at: $CREATED_AT"
echo "::endgroup::"
# --- Step 2: Staleness check ---
echo "::group::Checking staleness"
NOW_EPOCH=$(date +%s)
# Handle both GNU date (-d) and BSD date (-j -f) for cross-platform compat
CREATED_EPOCH=$(date -d "$CREATED_AT" +%s 2>/dev/null || \
date -j -f "%Y-%m-%dT%H:%M:%SZ" "$CREATED_AT" +%s 2>/dev/null || \
date -j -f "%Y-%m-%dT%T%z" "$CREATED_AT" +%s 2>/dev/null || \
{ echo "::error::Cannot parse date: $CREATED_AT"; exit 1; })
AGE_SECONDS=$(( NOW_EPOCH - CREATED_EPOCH ))
AGE_HOURS=$(( AGE_SECONDS / 3600 ))
IS_STALE="false"
if [ "$AGE_HOURS" -gt 24 ]; then
IS_STALE="true"
echo "::warning::STALE NIGHTLY: Build is ${AGE_HOURS}h old (>24h threshold)"
echo "This indicates the nightly docker build failed — no new image was produced"
else
echo "Nightly build age: ${AGE_HOURS}h (within 24h threshold)"
fi
echo "::endgroup::"
# --- Step 3: Read last successful feature ref from cache ---
echo "::group::Reading cached state"
LAST_FEATURE_REF=""
STATE_FILE=".nightly-state/last-feature-ref"
if [ -f "$STATE_FILE" ]; then
LAST_FEATURE_REF=$(tr -d '[:space:]' < "$STATE_FILE")
echo "Previous feature ref: $LAST_FEATURE_REF"
else
echo "No cached state found (first run)"
fi
echo "::endgroup::"
# --- Step 4: Determine baseline and skip logic ---
echo "::group::Resolving refs"
SHOULD_SKIP="false"
BASELINE_REF="$FEATURE_REF" # default for first run
if [ "$IS_STALE" = "true" ]; then
# Stale = error path, don't skip (will alert and fail downstream)
SHOULD_SKIP="false"
BASELINE_REF="${LAST_FEATURE_REF:-$FEATURE_REF}"
echo "Stale nightly detected — will alert and fail"
elif [ -z "$LAST_FEATURE_REF" ]; then
# First run: baseline = feature (self-comparison to establish baseline)
BASELINE_REF="$FEATURE_REF"
echo "First run — will benchmark nightly against itself to establish baseline"
elif [ "$LAST_FEATURE_REF" = "$FEATURE_REF" ]; then
# No new nightly since last successful run
if [ "$FORCE" = "true" ] || [ "$FORCE" = "--force" ]; then
echo "No new nightly, but force=true — running anyway"
BASELINE_REF="$LAST_FEATURE_REF"
else
SHOULD_SKIP="true"
echo "No new nightly since last run — will skip"
fi
else
# Normal case: new nightly available
BASELINE_REF="$LAST_FEATURE_REF"
echo "New nightly detected"
fi
echo "Baseline: $BASELINE_REF"
echo "Feature: $FEATURE_REF"
echo "Skip: $SHOULD_SKIP"
echo "Stale: $IS_STALE"
echo "::endgroup::"
# --- Step 5: Write outputs ---
{
echo "baseline-ref=$BASELINE_REF"
echo "feature-ref=$FEATURE_REF"
echo "should-skip=$SHOULD_SKIP"
echo "is-stale=$IS_STALE"
echo "stale-age-hours=$AGE_HOURS"
echo "nightly-created=$CREATED_AT"
} >> "$GITHUB_OUTPUT"

View File

@@ -7,6 +7,8 @@
// BENCH_PR PR number (may be empty)
// BENCH_ACTOR GitHub user who triggered the bench
// BENCH_JOB_URL URL to the Actions job page
// BENCH_BASELINE_ARGS Extra CLI args for the baseline reth node
// BENCH_FEATURE_ARGS Extra CLI args for the feature reth node
// BENCH_SAMPLY 'true' if samply profiling was enabled
//
// Usage from actions/github-script:
@@ -16,6 +18,7 @@
const fs = require('fs');
const path = require('path');
const { fmtChange, fmtMs, verdict, loadSamplyUrls, blocksLabel, metricRows, waitTimeRows } = require('./bench-utils');
const SLACK_API = 'https://slack.com/api/chat.postMessage';
@@ -59,41 +62,17 @@ function cell(text) {
return { type: 'raw_text', text: s || ' ' };
}
// Slack shortcodes for verdict (Block Kit header doesn't support unicode emoji)
const SLACK_VERDICT = {
'⚠️': ':warning:',
'❌': ':x:',
'✅': ':white_check_mark:',
'⚪': ':white_circle:',
};
function buildSuccessBlocks({ summary, prNumber, actor, actorSlackId, jobUrl, repo, samplyUrls }) {
const b = summary.baseline.stats;
const f = summary.feature.stats;
const c = summary.changes;
const sigEmoji = { good: '\u2705', bad: '\u274c', neutral: '\u26aa' };
function fmtMs(v) { return v.toFixed(2) + 'ms'; }
function fmtMgas(v) { return v.toFixed(2); }
function fmtS(v) { return v.toFixed(2) + 's'; }
function fmtChange(ch) {
if (!ch.pct && !ch.ci_pct) return ' ';
const pctStr = `${ch.pct >= 0 ? '+' : ''}${ch.pct.toFixed(2)}%`;
const ciStr = ch.ci_pct ? ` (\u00b1${ch.ci_pct.toFixed(2)}%)` : '';
return `${pctStr}${ciStr} ${sigEmoji[ch.sig]}`;
}
// Overall result for header
const vals = Object.values(c);
const hasBad = vals.some(v => v.sig === 'bad');
const hasGood = vals.some(v => v.sig === 'good');
let headerEmoji, headerResult;
if (hasBad && hasGood) {
headerEmoji = ':warning:';
headerResult = 'Mixed Results';
} else if (hasBad) {
headerEmoji = ':x:';
headerResult = 'Regression';
} else if (hasGood) {
headerEmoji = ':white_check_mark:';
headerResult = 'Improvement';
} else {
headerEmoji = ':white_circle:';
headerResult = 'No Difference';
}
const { emoji, label } = verdict(summary.changes);
const headerEmoji = SLACK_VERDICT[emoji] || emoji;
const prUrl = prNumber ? `https://github.com/${repo}/pull/${prNumber}` : '';
const commitUrl = `https://github.com/${repo}/commit`;
@@ -118,21 +97,15 @@ function buildSuccessBlocks({ summary, prNumber, actor, actorSlackId, jobUrl, re
if (fl1) featureLine += ` | <${fl1}|Samply 1>`;
if (fl2) featureLine += ` | <${fl2}|Samply 2>`;
const cores = process.env.BENCH_CORES || '0';
const countsParts = [];
if (summary.big_blocks) {
const gasRamp = summary.gas_ramp_blocks || 0;
if (gasRamp > 0) countsParts.push(`*Gas Ramp:* ${gasRamp}`);
countsParts.push(`*Big Blocks:* ${summary.blocks}`);
} else {
const warmup = summary.warmup_blocks || process.env.BENCH_WARMUP_BLOCKS || '';
if (warmup) countsParts.push(`*Warmup:* ${warmup}`);
countsParts.push(`*Blocks:* ${summary.blocks}`);
}
if (cores !== '0') countsParts.push(`*Cores:* ${cores}`);
const countsLine = countsParts.join(' | ');
const countsLine = blocksLabel(summary).map(p => `*${p.key}:* ${p.value}`).join(' | ');
const sectionText = [metaParts.join(' | '), '', baselineLine, featureLine, countsLine].join('\n');
const baselineArgs = process.env.BENCH_BASELINE_ARGS || '';
const featureArgs = process.env.BENCH_FEATURE_ARGS || '';
const argsLines = [];
if (baselineArgs) argsLines.push(`*Baseline Args:* \`${baselineArgs}\``);
if (featureArgs) argsLines.push(`*Feature Args:* \`${featureArgs}\``);
const sectionText = [metaParts.join(' | '), '', baselineLine, featureLine, ...argsLines, countsLine].join('\n');
// Action buttons
const diffUrl = `https://github.com/${repo}/compare/${summary.baseline.ref}...${summary.feature.ref}`;
@@ -151,10 +124,17 @@ function buildSuccessBlocks({ summary, prNumber, actor, actorSlackId, jobUrl, re
},
];
// Build table rows from shared metricRows
const rows = metricRows(summary);
const tableRows = [
[cell('Metric'), cell('Baseline'), cell('Feature'), cell('Change')],
...rows.map(r => [cell(r.label), cell(r.baseline), cell(r.feature), cell(r.change || ' ')]),
];
const blocks = [
{
type: 'header',
text: { type: 'plain_text', text: `${headerEmoji} ${headerResult}`, emoji: true },
text: { type: 'plain_text', text: `${headerEmoji} ${label}`, emoji: true },
},
{
type: 'section',
@@ -168,16 +148,7 @@ function buildSuccessBlocks({ summary, prNumber, actor, actorSlackId, jobUrl, re
{ align: 'right' },
{ align: 'right' },
],
rows: [
[cell('Metric'), cell('Baseline'), cell('Feature'), cell('Change')],
[cell('Mean'), cell(fmtMs(b.mean_ms)), cell(fmtMs(f.mean_ms)), cell(fmtChange(c.mean))],
[cell('StdDev'), cell(fmtMs(b.stddev_ms)), cell(fmtMs(f.stddev_ms)), cell(' ')],
[cell('P50'), cell(fmtMs(b.p50_ms)), cell(fmtMs(f.p50_ms)), cell(fmtChange(c.p50))],
[cell('P90'), cell(fmtMs(b.p90_ms)), cell(fmtMs(f.p90_ms)), cell(fmtChange(c.p90))],
[cell('P99'), cell(fmtMs(b.p99_ms)), cell(fmtMs(f.p99_ms)), cell(fmtChange(c.p99))],
[cell('Mgas/s'), cell(fmtMgas(b.mean_mgas_s)), cell(fmtMgas(f.mean_mgas_s)), cell(fmtChange(c.mgas_s))],
[cell('Wall Clock'), cell(fmtS(b.wall_clock_s)), cell(fmtS(f.wall_clock_s)), cell(fmtChange(c.wall_clock))],
],
rows: tableRows,
},
{
type: 'actions',
@@ -187,16 +158,12 @@ function buildSuccessBlocks({ summary, prNumber, actor, actorSlackId, jobUrl, re
// Wait times as a separate table block (sent as threaded reply due to Slack one-table limit)
const threadBlocks = [];
const waitTimes = summary.wait_times || {};
const waitKeys = Object.keys(waitTimes);
if (waitKeys.length > 0) {
const waitRows = [
const wtRows = waitTimeRows(summary);
if (wtRows.length > 0) {
const waitTableRows = [
[cell('Wait Time'), cell('Baseline'), cell('Feature')],
...wtRows.map(r => [cell(r.title), cell(r.baseline), cell(r.feature)]),
];
for (const key of waitKeys) {
const wt = waitTimes[key];
waitRows.push([cell(wt.title), cell(fmtMs(wt.baseline.mean_ms)), cell(fmtMs(wt.feature.mean_ms))]);
}
threadBlocks.push({
type: 'table',
column_settings: [
@@ -204,7 +171,7 @@ function buildSuccessBlocks({ summary, prNumber, actor, actorSlackId, jobUrl, re
{ align: 'right' },
{ align: 'right' },
],
rows: waitRows,
rows: waitTableRows,
});
}
@@ -266,16 +233,7 @@ async function success({ core, context }) {
const jobUrl = process.env.BENCH_JOB_URL ||
`${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/actions/runs/${context.runId}`;
// Load samply profile URLs (files exist when samply profiling was enabled)
const samplyUrls = {};
for (const run of ['baseline-1', 'baseline-2', 'feature-1', 'feature-2']) {
try {
const url = fs.readFileSync(
path.join(process.env.BENCH_WORK_DIR, run, 'samply-profile-url.txt'), 'utf8'
).trim();
if (url) samplyUrls[run] = url;
} catch {}
}
const samplyUrls = loadSamplyUrls(process.env.BENCH_WORK_DIR);
const slackUsers = loadSlackUsers(process.env.GITHUB_WORKSPACE || '.');
const actorSlackId = slackUsers[actor];

96
.github/scripts/bench-utils.js vendored Normal file
View File

@@ -0,0 +1,96 @@
// Shared utilities for reth-bench result rendering.
//
// Used by bench-job-summary.js and bench-slack-notify.js.
const fs = require('fs');
const path = require('path');
const SIG_EMOJI = { good: '✅', bad: '❌', neutral: '⚪' };
function fmtMs(v) { return v.toFixed(2) + 'ms'; }
function fmtMgas(v) { return v.toFixed(2); }
function fmtS(v) { return v.toFixed(2) + 's'; }
function fmtChange(ch) {
if (!ch || (!ch.pct && !ch.ci_pct)) return '';
const pctStr = `${ch.pct >= 0 ? '+' : ''}${ch.pct.toFixed(2)}%`;
const ciStr = ch.ci_pct ? `${ch.ci_pct.toFixed(2)}%)` : '';
return `${pctStr}${ciStr} ${SIG_EMOJI[ch.sig]}`;
}
function verdict(changes) {
const vals = Object.values(changes);
const hasBad = vals.some(v => v.sig === 'bad');
const hasGood = vals.some(v => v.sig === 'good');
if (hasBad && hasGood) return { emoji: '⚠️', label: 'Mixed Results' };
if (hasBad) return { emoji: '❌', label: 'Regression' };
if (hasGood) return { emoji: '✅', label: 'Improvement' };
return { emoji: '⚪', label: 'No Difference' };
}
function loadSamplyUrls(workDir) {
const urls = {};
for (const run of ['baseline-1', 'baseline-2', 'feature-1', 'feature-2']) {
try {
const url = fs.readFileSync(path.join(workDir, run, 'samply-profile-url.txt'), 'utf8').trim();
if (url) urls[run] = url;
} catch {}
}
return urls;
}
function blocksLabel(summary) {
const parts = [];
if (summary.big_blocks) {
if (summary.gas_ramp_blocks) parts.push({ key: 'Gas Ramp', value: summary.gas_ramp_blocks });
parts.push({ key: 'Big Blocks', value: summary.blocks });
} else {
const warmup = summary.warmup_blocks || process.env.BENCH_WARMUP_BLOCKS || '';
if (warmup) parts.push({ key: 'Warmup', value: warmup });
parts.push({ key: 'Blocks', value: summary.blocks });
}
const cores = process.env.BENCH_CORES || '0';
if (cores !== '0') parts.push({ key: 'Cores', value: cores });
return parts;
}
// The 7 metric rows shared by all renderers.
// Returns an array of { label, baseline, feature, change } objects.
function metricRows(summary) {
const b = summary.baseline.stats;
const f = summary.feature.stats;
const c = summary.changes;
return [
{ label: 'Mean', baseline: fmtMs(b.mean_ms), feature: fmtMs(f.mean_ms), change: fmtChange(c.mean) },
{ label: 'StdDev', baseline: fmtMs(b.stddev_ms), feature: fmtMs(f.stddev_ms), change: '' },
{ label: 'P50', baseline: fmtMs(b.p50_ms), feature: fmtMs(f.p50_ms), change: fmtChange(c.p50) },
{ label: 'P90', baseline: fmtMs(b.p90_ms), feature: fmtMs(f.p90_ms), change: fmtChange(c.p90) },
{ label: 'P99', baseline: fmtMs(b.p99_ms), feature: fmtMs(f.p99_ms), change: fmtChange(c.p99) },
{ label: 'Mgas/s', baseline: fmtMgas(b.mean_mgas_s), feature: fmtMgas(f.mean_mgas_s), change: fmtChange(c.mgas_s) },
{ label: 'Wall Clock', baseline: fmtS(b.wall_clock_s), feature: fmtS(f.wall_clock_s), change: fmtChange(c.wall_clock) },
];
}
// Wait time rows: one row per metric showing mean values.
function waitTimeRows(summary) {
const waitTimes = summary.wait_times || {};
const rows = [];
for (const key of Object.keys(waitTimes)) {
const wt = waitTimes[key];
rows.push({ title: wt.title, baseline: fmtMs(wt.baseline.mean_ms), feature: fmtMs(wt.feature.mean_ms) });
}
return rows;
}
module.exports = {
SIG_EMOJI,
fmtMs,
fmtMgas,
fmtS,
fmtChange,
verdict,
loadSamplyUrls,
blocksLabel,
metricRows,
waitTimeRows,
};

865
.github/workflows/bench-scheduled.yml vendored Normal file
View File

@@ -0,0 +1,865 @@
# Nightly regression benchmark.
#
# Compares the previous nightly build against the current nightly build to
# detect performance regressions. Runs daily after docker.yml produces a new
# nightly image at 01:00 UTC.
#
# State is persisted between runs via GitHub Actions cache: each successful
# run saves the feature commit SHA so the next run knows what to compare against.
on:
schedule:
- cron: "30 5 * * *" # 06:30 UTC daily
workflow_dispatch:
inputs:
force:
description: "Force run even if no new nightly (bypass skip logic)"
required: false
default: false
type: boolean
no_slack:
description: "Suppress Slack notifications"
required: false
default: true
type: boolean
env:
CARGO_TERM_COLOR: always
RUSTC_WRAPPER: "sccache"
name: bench-scheduled
permissions:
contents: read
actions: read
jobs:
# ---------------------------------------------------------------------------
# Job 1: Resolve nightly refs, check staleness, manage state
# ---------------------------------------------------------------------------
resolve-refs:
name: resolve-refs
runs-on: ubuntu-latest
outputs:
baseline-ref: ${{ steps.refs.outputs.baseline-ref }}
feature-ref: ${{ steps.refs.outputs.feature-ref }}
should-skip: ${{ steps.refs.outputs.should-skip }}
is-stale: ${{ steps.refs.outputs.is-stale }}
stale-age-hours: ${{ steps.refs.outputs.stale-age-hours }}
nightly-created: ${{ steps.refs.outputs.nightly-created }}
steps:
- uses: actions/checkout@v6
with:
sparse-checkout: .github/scripts
sparse-checkout-cone-mode: true
- name: Restore nightly state
id: state-cache
uses: actions/cache/restore@v4
with:
path: .nightly-state
key: bench-scheduled-state-dummy
restore-keys: |
bench-scheduled-state-
- name: Resolve nightly refs
id: refs
env:
GH_TOKEN: ${{ github.token }}
GITHUB_REPOSITORY: ${{ github.repository }}
run: |
FORCE="${{ inputs.force || 'false' }}"
.github/scripts/bench-scheduled-refs.sh "$FORCE"
- name: Alert on stale nightly
if: steps.refs.outputs.is-stale == 'true'
uses: actions/github-script@v8
env:
SLACK_BENCH_BOT_TOKEN: ${{ secrets.SLACK_BENCH_BOT_TOKEN }}
SLACK_BENCH_CHANNEL: ${{ secrets.SLACK_BENCH_CHANNEL }}
with:
script: |
const token = process.env.SLACK_BENCH_BOT_TOKEN;
const channel = process.env.SLACK_BENCH_CHANNEL;
if (!token || !channel) {
core.warning('Slack credentials not set, skipping stale nightly alert');
return;
}
const ageHours = '${{ steps.refs.outputs.stale-age-hours }}';
const created = '${{ steps.refs.outputs.nightly-created }}';
const featureRef = '${{ steps.refs.outputs.feature-ref }}';
const shortSha = featureRef.slice(0, 8);
const repo = '${{ github.repository }}';
const runUrl = `${context.serverUrl}/${repo}/actions/runs/${context.runId}`;
const blocks = [
{
type: 'header',
text: { type: 'plain_text', text: ':rotating_light: Nightly Regression: nightly build is stale', emoji: true },
},
{
type: 'section',
text: {
type: 'mrkdwn',
text: [
'*Nightly regression did not run* — nightly build is stale',
'',
`The latest nightly image was built from a commit that is *${ageHours}h old* (threshold: 24h).`,
`This means today's nightly docker build likely failed and no new image was produced.`,
'',
`Stale commit: \`${shortSha}\` (built at ${created})`,
'',
'*Action required:* Check the <https://github.com/' + repo + '/actions/workflows/docker.yml|docker.yml> workflow for failures.',
].join('\n'),
},
},
{
type: 'actions',
elements: [
{
type: 'button',
text: { type: 'plain_text', text: 'View Run :github:', emoji: true },
url: runUrl,
action_id: 'ci_button',
},
],
},
];
const resp = await fetch('https://slack.com/api/chat.postMessage', {
method: 'POST',
headers: {
'Authorization': `Bearer ${token}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
channel,
blocks,
text: 'Nightly regression: nightly build is stale',
unfurl_links: false,
}),
});
const data = await resp.json();
if (!data.ok) {
core.warning(`Slack API error: ${JSON.stringify(data)}`);
}
- name: Fail on stale nightly
if: steps.refs.outputs.is-stale == 'true'
run: |
echo "::error::Nightly build is stale (>24h old). Aborting."
exit 1
# ---------------------------------------------------------------------------
# Job 2: Run the benchmark
# ---------------------------------------------------------------------------
bench-scheduled:
needs: resolve-refs
if: |
needs.resolve-refs.outputs.should-skip != 'true' &&
needs.resolve-refs.outputs.is-stale != 'true'
name: bench-scheduled
runs-on: [self-hosted, Linux, X64, available]
timeout-minutes: 120
env:
BENCH_RPC_URL: https://ethereum.reth.rs/rpc
SCHELK_MOUNT: /reth-bench
BENCH_WORK_DIR: ${{ github.workspace }}/bench-work
BENCH_PR: ""
BENCH_ACTOR: "nightly-regression"
BENCH_BLOCKS: "2000"
BENCH_WARMUP_BLOCKS: "500"
BENCH_SAMPLY: "false"
BENCH_CORES: "0"
BENCH_BIG_BLOCKS: "false"
BENCH_RETH_NEW_PAYLOAD: "true"
BENCH_WAIT_TIME: ""
BENCH_BASELINE_ARGS: ""
BENCH_FEATURE_ARGS: ""
BENCH_ABBA: "true"
BENCH_COMMENT_ID: ""
BENCH_NO_SLACK: ${{ github.event_name == 'workflow_dispatch' && inputs.no_slack == true && 'true' || 'false' }}
BENCH_METRICS_ADDR: "127.0.0.1:9100"
BENCH_OTLP_TRACES_ENDPOINT: ${{ secrets.BENCH_OTLP_TRACES_ENDPOINT }}
BENCH_OTLP_LOGS_ENDPOINT: ${{ secrets.BENCH_OTLP_LOGS_ENDPOINT }}
BASELINE_REF: ${{ needs.resolve-refs.outputs.baseline-ref }}
FEATURE_REF: ${{ needs.resolve-refs.outputs.feature-ref }}
steps:
- name: Clean up previous bench-work
run: sudo rm -rf "$BENCH_WORK_DIR" 2>/dev/null || true
- uses: actions/checkout@v6
with:
submodules: true
fetch-depth: 0
ref: ${{ needs.resolve-refs.outputs.feature-ref }}
- name: Resolve job URL
id: job-url
uses: actions/github-script@v8
with:
script: |
const { data: jobs } = await github.rest.actions.listJobsForWorkflowRun({
owner: context.repo.owner,
repo: context.repo.repo,
run_id: context.runId,
});
const job = jobs.jobs.find(j => j.name === 'bench-scheduled');
const jobUrl = job ? job.html_url : `${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/actions/runs/${context.runId}`;
core.exportVariable('BENCH_JOB_URL', jobUrl);
- uses: dtolnay/rust-toolchain@stable
- uses: mozilla-actions/sccache-action@v0.0.9
continue-on-error: true
- name: Install dependencies
env:
DEREK_TOKEN: ${{ secrets.DEREK_TOKEN }}
run: |
mkdir -p "$HOME/.local/bin"
# apt packages
sudo apt-get update -qq
sudo apt-get install -y --no-install-recommends \
python3 make jq zstd curl dmsetup \
linux-tools-"$(uname -r)" || \
sudo apt-get install -y --no-install-recommends linux-tools-generic
# mc (MinIO client)
if ! command -v mc &>/dev/null; then
curl -sSfL https://dl.min.io/client/mc/release/linux-amd64/mc -o "$HOME/.local/bin/mc"
chmod +x "$HOME/.local/bin/mc"
fi
# uv (Python package manager)
if ! command -v uv &>/dev/null; then
curl -LsSf https://astral.sh/uv/install.sh | env UV_INSTALL_DIR="$HOME/.local/bin" sh
fi
# Configure git auth for private repos
git config --global url."https://x-access-token:${DEREK_TOKEN}@github.com/".insteadOf "https://github.com/"
# thin-provisioning-tools (era_invalidate, required by schelk)
if ! command -v era_invalidate &>/dev/null; then
git clone --depth 1 https://github.com/jthornber/thin-provisioning-tools /tmp/tpt
sudo make -C /tmp/tpt install
rm -rf /tmp/tpt
fi
# schelk (snapshot rollback tool, invoked via sudo)
if ! sudo sh -c 'command -v schelk' &>/dev/null; then
cargo install --git https://github.com/tempoxyz/schelk --locked
sudo install "$HOME/.cargo/bin/schelk" /usr/local/bin/
fi
- name: Check dependencies
run: |
export PATH="$HOME/.local/bin:$HOME/.cargo/bin:$PATH"
echo "$HOME/.local/bin" >> "$GITHUB_PATH"
echo "$HOME/.cargo/bin" >> "$GITHUB_PATH"
missing=()
for cmd in mc schelk cpupower taskset stdbuf python3 curl make uv pzstd jq; do
command -v "$cmd" &>/dev/null || missing+=("$cmd")
done
if [ ${#missing[@]} -gt 0 ]; then
echo "::error::Missing required tools: ${missing[*]}"
exit 1
fi
echo "All dependencies found"
- name: Resolve display names
id: refs
run: |
BASELINE_SHORT=$(echo "$BASELINE_REF" | cut -c1-8)
FEATURE_SHORT=$(echo "$FEATURE_REF" | cut -c1-8)
echo "baseline-name=nightly-${BASELINE_SHORT}" >> "$GITHUB_OUTPUT"
echo "feature-name=nightly-${FEATURE_SHORT}" >> "$GITHUB_OUTPUT"
echo "baseline-ref=$BASELINE_REF" >> "$GITHUB_OUTPUT"
echo "feature-ref=$FEATURE_REF" >> "$GITHUB_OUTPUT"
- name: Check if snapshot needs update
id: snapshot-check
run: |
if .github/scripts/bench-reth-snapshot.sh --check; then
echo "needed=false" >> "$GITHUB_OUTPUT"
else
echo "needed=true" >> "$GITHUB_OUTPUT"
fi
- name: Prepare source dirs
run: |
if [ -d ../reth-baseline ]; then
git -C ../reth-baseline fetch origin "$BASELINE_REF"
else
git clone . ../reth-baseline
fi
git -C ../reth-baseline checkout "$BASELINE_REF"
if [ -d ../reth-feature ]; then
git -C ../reth-feature fetch origin "$FEATURE_REF"
else
git clone . ../reth-feature
fi
git -C ../reth-feature checkout "$FEATURE_REF"
- name: Build binaries and download snapshot in parallel
id: build
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
BENCH_REPO: ${{ github.repository }}
SNAPSHOT_NEEDED: ${{ steps.snapshot-check.outputs.needed }}
run: |
BASELINE_DIR="$(cd ../reth-baseline && pwd)"
FEATURE_DIR="$(cd ../reth-feature && pwd)"
.github/scripts/bench-reth-build.sh baseline "${BASELINE_DIR}" "$BASELINE_REF" &
PID_BASELINE=$!
.github/scripts/bench-reth-build.sh feature "${FEATURE_DIR}" "$FEATURE_REF" &
PID_FEATURE=$!
PID_SNAPSHOT=
if [ "$SNAPSHOT_NEEDED" = "true" ]; then
.github/scripts/bench-reth-snapshot.sh &
PID_SNAPSHOT=$!
fi
FAIL=0
wait $PID_BASELINE || FAIL=1
wait $PID_FEATURE || FAIL=1
[ -n "$PID_SNAPSHOT" ] && { wait $PID_SNAPSHOT || FAIL=1; }
if [ $FAIL -ne 0 ]; then
echo "::error::One or more parallel tasks failed (builds / snapshot download)"
exit 1
fi
# System tuning for reproducible benchmarks
- name: System setup
run: |
sudo cpupower frequency-set -g performance || true
# Disable turbo boost (Intel and AMD paths)
echo 1 | sudo tee /sys/devices/system/cpu/intel_pstate/no_turbo 2>/dev/null || true
echo 0 | sudo tee /sys/devices/system/cpu/cpufreq/boost 2>/dev/null || true
sudo swapoff -a || true
echo 0 | sudo tee /proc/sys/kernel/randomize_va_space || true
# Disable SMT (hyperthreading)
for cpu in /sys/devices/system/cpu/cpu*/topology/thread_siblings_list; do
first=$(cut -d, -f1 < "$cpu" | cut -d- -f1)
current=$(echo "$cpu" | grep -o 'cpu[0-9]*' | grep -o '[0-9]*')
if [ "$current" != "$first" ]; then
echo 0 | sudo tee "/sys/devices/system/cpu/cpu${current}/online" || true
fi
done
echo "Online CPUs: $(nproc)"
# Disable transparent huge pages
for p in /sys/kernel/mm/transparent_hugepage /sys/kernel/mm/transparent_hugepages; do
[ -d "$p" ] && echo never | sudo tee "$p/enabled" && echo never | sudo tee "$p/defrag" && break
done || true
# Prevent deep C-states
sudo sh -c 'exec 3<>/dev/cpu_dma_latency; echo -ne "\x00\x00\x00\x00" >&3; sleep infinity' &
# Move all IRQs to core 0
for irq in /proc/irq/*/smp_affinity_list; do
echo 0 | sudo tee "$irq" 2>/dev/null || true
done
# Stop noisy background services
sudo systemctl stop irqbalance cron atd unattended-upgrades snapd 2>/dev/null || true
echo "=== Benchmark environment ==="
uname -r
lscpu | grep -E 'Model name|CPU\(s\)|MHz|NUMA'
cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_governor
cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_cur_freq
cat /sys/kernel/mm/transparent_hugepage/enabled 2>/dev/null || cat /sys/kernel/mm/transparent_hugepages/enabled 2>/dev/null || echo "THP: unknown"
free -h
- name: Pre-flight cleanup
run: |
sudo pkill -9 reth || true
sleep 1
if mountpoint -q "$SCHELK_MOUNT"; then
sudo umount -l "$SCHELK_MOUNT" || true
sudo schelk recover -y || true
fi
rm -rf "$BENCH_WORK_DIR"
mkdir -p "$BENCH_WORK_DIR"
- name: Start metrics proxy
run: |
BENCH_ID="nightly-${{ github.run_id }}"
BENCH_REFERENCE_EPOCH=$(date +%s)
echo "BENCH_ID=${BENCH_ID}" >> "$GITHUB_ENV"
echo "BENCH_REFERENCE_EPOCH=${BENCH_REFERENCE_EPOCH}" >> "$GITHUB_ENV"
LABELS_FILE="/tmp/bench-metrics-labels.json"
echo '{}' > "$LABELS_FILE"
echo "BENCH_LABELS_FILE=${LABELS_FILE}" >> "$GITHUB_ENV"
python3 .github/scripts/bench-metrics-proxy.py \
--labels "$LABELS_FILE" \
--upstream "http://${BENCH_METRICS_ADDR}/" \
--subnet 10.10.0.0/24 \
--port 9090 &
PROXY_PID=$!
echo "BENCH_METRICS_PROXY_PID=${PROXY_PID}" >> "$GITHUB_ENV"
echo "Metrics proxy started (PID $PROXY_PID)"
# Interleaved run order (B-F-F-B) to reduce systematic bias
- name: "Run benchmark: baseline (1/2)"
id: run-baseline-1
run: |
cat > "$BENCH_LABELS_FILE" <<LABELS
{"benchmark_run":"baseline-1","run_type":"baseline","git_ref":"${BASELINE_REF}","bench_sha":"${BASELINE_REF}","benchmark_id":"${BENCH_ID}","run_start_epoch":"$(date +%s)","reference_epoch":"${BENCH_REFERENCE_EPOCH}"}
LABELS
taskset -c 0 .github/scripts/bench-reth-run.sh baseline ../reth-baseline/target/profiling/reth "$BENCH_WORK_DIR/baseline-1"
- name: "Run benchmark: feature (1/2)"
id: run-feature-1
run: |
cat > "$BENCH_LABELS_FILE" <<LABELS
{"benchmark_run":"feature-1","run_type":"feature","git_ref":"${FEATURE_REF}","bench_sha":"${FEATURE_REF}","benchmark_id":"${BENCH_ID}","run_start_epoch":"$(date +%s)","reference_epoch":"${BENCH_REFERENCE_EPOCH}"}
LABELS
taskset -c 0 .github/scripts/bench-reth-run.sh feature ../reth-feature/target/profiling/reth "$BENCH_WORK_DIR/feature-1"
- name: "Run benchmark: feature (2/2)"
id: run-feature-2
run: |
cat > "$BENCH_LABELS_FILE" <<LABELS
{"benchmark_run":"feature-2","run_type":"feature","git_ref":"${FEATURE_REF}","bench_sha":"${FEATURE_REF}","benchmark_id":"${BENCH_ID}","run_start_epoch":"$(date +%s)","reference_epoch":"${BENCH_REFERENCE_EPOCH}"}
LABELS
taskset -c 0 .github/scripts/bench-reth-run.sh feature ../reth-feature/target/profiling/reth "$BENCH_WORK_DIR/feature-2"
- name: "Run benchmark: baseline (2/2)"
id: run-baseline-2
run: |
LAST_RUN_START=$(date +%s)
echo "BENCH_LAST_RUN_START=${LAST_RUN_START}" >> "$GITHUB_ENV"
cat > "$BENCH_LABELS_FILE" <<LABELS
{"benchmark_run":"baseline-2","run_type":"baseline","git_ref":"${BASELINE_REF}","bench_sha":"${BASELINE_REF}","benchmark_id":"${BENCH_ID}","run_start_epoch":"${LAST_RUN_START}","reference_epoch":"${BENCH_REFERENCE_EPOCH}"}
LABELS
taskset -c 0 .github/scripts/bench-reth-run.sh baseline ../reth-baseline/target/profiling/reth "$BENCH_WORK_DIR/baseline-2"
- name: Stop metrics proxy & generate Grafana URL
id: metrics
if: "!cancelled()"
run: |
kill "$BENCH_METRICS_PROXY_PID" 2>/dev/null || true
LAST_RUN_DURATION=$(( $(date +%s) - BENCH_LAST_RUN_START ))
FROM_MS=$(( BENCH_REFERENCE_EPOCH * 1000 ))
TO_MS=$(( (BENCH_REFERENCE_EPOCH + LAST_RUN_DURATION) * 1000 ))
GRAFANA_URL="https://tempoxyz.grafana.net/d/reth-bench-ghr/reth-bench-ghr?orgId=1&from=${FROM_MS}&to=${TO_MS}&timezone=browser&var-datasource=ef57fux92e9z4e&var-job=reth-bench&var-benchmark_id=${BENCH_ID}&var-benchmark_run=\$__all"
echo "grafana-url=${GRAFANA_URL}" >> "$GITHUB_OUTPUT"
echo "Grafana URL: ${GRAFANA_URL}"
- name: Scan logs for errors
if: "!cancelled()"
run: |
ERRORS_FILE="$BENCH_WORK_DIR/errors.md"
found=false
for run_dir in baseline-1 feature-1 feature-2 baseline-2; do
LOG="$BENCH_WORK_DIR/$run_dir/node.log"
if [ ! -f "$LOG" ]; then continue; fi
panics=$(grep -c -E 'panicked at' "$LOG" || true)
errors=$(grep -c ' ERROR ' "$LOG" || true)
if [ "$panics" -gt 0 ] || [ "$errors" -gt 0 ]; then
if [ "$found" = false ]; then
printf '### ⚠️ Node Errors\n\n' >> "$ERRORS_FILE"
found=true
fi
printf '<details><summary><b>%s</b>: %d panic(s), %d error(s)</summary>\n\n' "$run_dir" "$panics" "$errors" >> "$ERRORS_FILE"
if [ "$panics" -gt 0 ]; then
printf '**Panics:**\n```\n' >> "$ERRORS_FILE"
grep -E 'panicked at' "$LOG" | head -10 >> "$ERRORS_FILE"
printf '```\n' >> "$ERRORS_FILE"
fi
if [ "$errors" -gt 0 ]; then
printf '**Errors (first 20):**\n```\n' >> "$ERRORS_FILE"
grep ' ERROR ' "$LOG" | head -20 >> "$ERRORS_FILE"
printf '```\n' >> "$ERRORS_FILE"
fi
printf '\n</details>\n\n' >> "$ERRORS_FILE"
fi
done
- name: Parse results
id: results
if: success()
env:
BASELINE_NAME: ${{ steps.refs.outputs.baseline-name }}
FEATURE_NAME: ${{ steps.refs.outputs.feature-name }}
run: |
SUMMARY_ARGS="--output-summary $BENCH_WORK_DIR/summary.json"
SUMMARY_ARGS="$SUMMARY_ARGS --output-markdown $BENCH_WORK_DIR/comment.md"
SUMMARY_ARGS="$SUMMARY_ARGS --repo ${{ github.repository }}"
SUMMARY_ARGS="$SUMMARY_ARGS --baseline-ref ${BASELINE_REF}"
SUMMARY_ARGS="$SUMMARY_ARGS --baseline-name ${BASELINE_NAME}"
SUMMARY_ARGS="$SUMMARY_ARGS --feature-name ${FEATURE_NAME}"
SUMMARY_ARGS="$SUMMARY_ARGS --feature-ref ${FEATURE_REF}"
BASELINE_CSVS="$BENCH_WORK_DIR/baseline-1/combined_latency.csv"
FEATURE_CSVS="$BENCH_WORK_DIR/feature-1/combined_latency.csv"
BASELINE_CSVS="$BASELINE_CSVS $BENCH_WORK_DIR/baseline-2/combined_latency.csv"
FEATURE_CSVS="$FEATURE_CSVS $BENCH_WORK_DIR/feature-2/combined_latency.csv"
SUMMARY_ARGS="$SUMMARY_ARGS --baseline-csv $BASELINE_CSVS"
SUMMARY_ARGS="$SUMMARY_ARGS --feature-csv $FEATURE_CSVS"
SUMMARY_ARGS="$SUMMARY_ARGS --gas-csv $BENCH_WORK_DIR/feature-1/total_gas.csv"
GRAFANA_URL='${{ steps.metrics.outputs.grafana-url }}'
if [ -n "$GRAFANA_URL" ]; then
SUMMARY_ARGS="$SUMMARY_ARGS --grafana-url $GRAFANA_URL"
fi
# shellcheck disable=SC2086
python3 .github/scripts/bench-reth-summary.py $SUMMARY_ARGS
- name: Generate charts
if: success()
env:
BASELINE_NAME: ${{ steps.refs.outputs.baseline-name }}
FEATURE_NAME: ${{ steps.refs.outputs.feature-name }}
run: |
CHART_ARGS="--output-dir $BENCH_WORK_DIR/charts"
FEATURE_CSVS="$BENCH_WORK_DIR/feature-1/combined_latency.csv"
BASELINE_CSVS="$BENCH_WORK_DIR/baseline-1/combined_latency.csv"
FEATURE_CSVS="$FEATURE_CSVS $BENCH_WORK_DIR/feature-2/combined_latency.csv"
BASELINE_CSVS="$BASELINE_CSVS $BENCH_WORK_DIR/baseline-2/combined_latency.csv"
CHART_ARGS="$CHART_ARGS --feature $FEATURE_CSVS"
CHART_ARGS="$CHART_ARGS --baseline $BASELINE_CSVS"
CHART_ARGS="$CHART_ARGS --baseline-name ${BASELINE_NAME}"
CHART_ARGS="$CHART_ARGS --feature-name ${FEATURE_NAME}"
# shellcheck disable=SC2086
uv run --with matplotlib python3 .github/scripts/bench-reth-charts.py $CHART_ARGS
- name: Upload results
if: "!cancelled()"
uses: actions/upload-artifact@v7
with:
name: bench-scheduled-results
path: ${{ env.BENCH_WORK_DIR }}
- name: Push charts
id: push-charts
if: success()
run: |
RUN_ID=${{ github.run_id }}
CHART_DIR="nightly/${RUN_ID}"
CHARTS_REPO="https://x-access-token:${{ secrets.DEREK_TOKEN }}@github.com/decofe/reth-bench-charts.git"
TMP_DIR=$(mktemp -d)
if git clone --depth 1 "${CHARTS_REPO}" "${TMP_DIR}" 2>/dev/null; then
true
else
git init "${TMP_DIR}"
git -C "${TMP_DIR}" remote add origin "${CHARTS_REPO}"
fi
mkdir -p "${TMP_DIR}/${CHART_DIR}"
cp "$BENCH_WORK_DIR"/charts/*.png "${TMP_DIR}/${CHART_DIR}/"
git -C "${TMP_DIR}" add "${CHART_DIR}"
git -C "${TMP_DIR}" -c user.name="github-actions" -c user.email="github-actions@github.com" \
commit -m "nightly bench charts for run ${RUN_ID}"
git -C "${TMP_DIR}" push origin HEAD:main
echo "sha=$(git -C "${TMP_DIR}" rev-parse HEAD)" >> "$GITHUB_OUTPUT"
rm -rf "${TMP_DIR}"
- name: Write job summary
if: success()
uses: actions/github-script@v8
with:
script: |
const fs = require('fs');
const { verdict, metricRows, waitTimeRows, blocksLabel } = require('./.github/scripts/bench-utils');
let summary;
try {
summary = JSON.parse(fs.readFileSync(process.env.BENCH_WORK_DIR + '/summary.json', 'utf8'));
} catch (e) {
await core.summary.addRaw('⚠️ Benchmark completed but failed to load summary.').write();
return;
}
const repo = `${context.repo.owner}/${context.repo.repo}`;
const commitUrl = `https://github.com/${repo}/commit`;
const { emoji, label } = verdict(summary.changes);
const baselineLink = `[\`${summary.baseline.name}\`](${commitUrl}/${summary.baseline.ref})`;
const featureLink = `[\`${summary.feature.name}\`](${commitUrl}/${summary.feature.ref})`;
const diffUrl = `https://github.com/${repo}/compare/${summary.baseline.ref}...${summary.feature.ref}`;
let md = `# ${emoji} Nightly Regression: ${label}\n\n`;
md += `**Baseline:** ${baselineLink}\n`;
md += `**Feature:** ${featureLink} ([diff](${diffUrl}))\n`;
md += blocksLabel(summary).map(p => `**${p.key}:** ${p.value}`).join(' · ') + '\n\n';
const rows = metricRows(summary);
md += `| Metric | Baseline | Feature | Change |\n`;
md += `|--------|----------|---------|--------|\n`;
for (const r of rows) {
md += `| ${r.label} | ${r.baseline} | ${r.feature} | ${r.change} |\n`;
}
md += '\n';
const wtRows = waitTimeRows(summary);
if (wtRows.length > 0) {
md += `### Wait Time Breakdown\n\n`;
md += `| Metric | Baseline | Feature |\n`;
md += `|--------|----------|--------|\n`;
for (const r of wtRows) {
md += `| ${r.title} | ${r.baseline} | ${r.feature} |\n`;
}
md += '\n';
}
// Charts
const chartSha = '${{ steps.push-charts.outputs.sha }}';
if (chartSha) {
const runId = '${{ github.run_id }}';
const baseUrl = `https://raw.githubusercontent.com/decofe/reth-bench-charts/${chartSha}/nightly/${runId}`;
const charts = [
{ file: 'latency_throughput.png', label: 'Latency, Throughput & Diff' },
{ file: 'wait_breakdown.png', label: 'Wait Time Breakdown' },
{ file: 'gas_vs_latency.png', label: 'Gas vs Latency' },
];
md += `### Charts\n\n`;
for (const chart of charts) {
md += `<details><summary>${chart.label}</summary>\n\n`;
md += `![${chart.label}](${baseUrl}/${chart.file})\n\n`;
md += `</details>\n\n`;
}
}
const grafanaUrl = '${{ steps.metrics.outputs.grafana-url }}';
if (grafanaUrl) {
md += `### Grafana Dashboard\n\n[View real-time metrics](${grafanaUrl})\n\n`;
}
try {
const errors = fs.readFileSync(process.env.BENCH_WORK_DIR + '/errors.md', 'utf8');
if (errors.trim()) md += '\n' + errors + '\n';
} catch {}
await core.summary.addRaw(md).write();
- name: Send Slack notification (success)
if: success() && env.BENCH_NO_SLACK != 'true'
uses: actions/github-script@v8
env:
SLACK_BENCH_BOT_TOKEN: ${{ secrets.SLACK_BENCH_BOT_TOKEN }}
SLACK_BENCH_CHANNEL: ${{ secrets.SLACK_BENCH_CHANNEL }}
with:
script: |
const fs = require('fs');
const { verdict, fmtChange, fmtMs, metricRows, waitTimeRows, blocksLabel } = require('./.github/scripts/bench-utils');
const token = process.env.SLACK_BENCH_BOT_TOKEN;
const channel = process.env.SLACK_BENCH_CHANNEL;
if (!token || !channel) {
core.info('Slack credentials not set, skipping notification');
return;
}
let summary;
try {
summary = JSON.parse(fs.readFileSync(process.env.BENCH_WORK_DIR + '/summary.json', 'utf8'));
} catch (e) {
core.warning('Could not read summary.json for Slack notification');
return;
}
// Only notify on significant changes (regression OR improvement)
const changes = summary.changes || {};
const hasSignificant = Object.values(changes).some(c => c.sig === 'good' || c.sig === 'bad');
if (!hasSignificant) {
core.info('No significant changes detected, skipping nightly Slack notification');
return;
}
const SLACK_VERDICT = {
'⚠️': ':warning:',
'❌': ':x:',
'✅': ':white_check_mark:',
'⚪': ':white_circle:',
};
const repo = `${context.repo.owner}/${context.repo.repo}`;
const { emoji, label } = verdict(changes);
const headerEmoji = SLACK_VERDICT[emoji] || emoji;
const commitUrl = `https://github.com/${repo}/commit`;
const baselineLink = `<${commitUrl}/${summary.baseline.ref}|${summary.baseline.name}>`;
const featureLink = `<${commitUrl}/${summary.feature.ref}|${summary.feature.name}>`;
const diffUrl = `https://github.com/${repo}/compare/${summary.baseline.ref}...${summary.feature.ref}`;
const jobUrl = process.env.BENCH_JOB_URL || `${context.serverUrl}/${repo}/actions/runs/${context.runId}`;
function cell(text) { return { type: 'raw_text', text: String(text) || ' ' }; }
const sectionText = [
'*Nightly Regression*',
'',
`*Baseline:* ${baselineLink}`,
`*Feature:* ${featureLink}`,
blocksLabel(summary).map(p => `*${p.key}:* ${p.value}`).join(' | '),
].join('\n');
const rows = metricRows(summary);
const tableRows = [
[cell('Metric'), cell('Baseline'), cell('Feature'), cell('Change')],
...rows.map(r => [cell(r.label), cell(r.baseline), cell(r.feature), cell(r.change || ' ')]),
];
const blocks = [
{
type: 'header',
text: { type: 'plain_text', text: `${headerEmoji} Nightly: ${label}`, emoji: true },
},
{
type: 'section',
text: { type: 'mrkdwn', text: sectionText },
},
{
type: 'table',
column_settings: [{ align: 'left' }, { align: 'right' }, { align: 'right' }, { align: 'right' }],
rows: tableRows,
},
{
type: 'actions',
elements: [
{
type: 'button',
text: { type: 'plain_text', text: 'CI :github:', emoji: true },
url: jobUrl,
action_id: 'ci_button',
},
{
type: 'button',
text: { type: 'plain_text', text: 'Diff :github:', emoji: true },
url: diffUrl,
action_id: 'diff_button',
},
],
},
];
const text = `Nightly Regression: ${summary.baseline.name} vs ${summary.feature.name}`;
const resp = await fetch('https://slack.com/api/chat.postMessage', {
method: 'POST',
headers: {
'Authorization': `Bearer ${token}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({ channel, blocks, text, unfurl_links: false }),
});
const data = await resp.json();
if (!data.ok) {
core.warning(`Slack API error: ${JSON.stringify(data)}`);
return;
}
// Post wait time breakdown as threaded reply
const wtRows = waitTimeRows(summary);
if (data.ts && wtRows.length > 0) {
const waitTableRows = [
[cell('Wait Time'), cell('Baseline'), cell('Feature')],
...wtRows.map(r => [cell(r.title), cell(r.baseline), cell(r.feature)]),
];
await fetch('https://slack.com/api/chat.postMessage', {
method: 'POST',
headers: {
'Authorization': `Bearer ${token}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
channel,
thread_ts: data.ts,
blocks: [{
type: 'table',
column_settings: [{ align: 'left' }, { align: 'right' }, { align: 'right' }],
rows: waitTableRows,
}],
text: 'Wait time breakdown',
unfurl_links: false,
}),
});
}
- name: Send Slack notification (failure)
if: failure()
uses: actions/github-script@v8
env:
SLACK_BENCH_BOT_TOKEN: ${{ secrets.SLACK_BENCH_BOT_TOKEN }}
SLACK_BENCH_CHANNEL: ${{ secrets.SLACK_BENCH_CHANNEL }}
with:
script: |
const token = process.env.SLACK_BENCH_BOT_TOKEN;
const channel = process.env.SLACK_BENCH_CHANNEL;
if (!token || !channel) return;
const steps_status = [
['building binaries${{ steps.snapshot-check.outputs.needed == 'true' && ' & downloading snapshot' || '' }}', '${{ steps.build.outcome }}'],
['running baseline benchmark (1/2)', '${{ steps.run-baseline-1.outcome }}'],
['running feature benchmark (1/2)', '${{ steps.run-feature-1.outcome }}'],
['running feature benchmark (2/2)', '${{ steps.run-feature-2.outcome }}'],
['running baseline benchmark (2/2)', '${{ steps.run-baseline-2.outcome }}'],
];
const failed = steps_status.find(([, o]) => o === 'failure');
const failedStep = failed ? failed[0] : 'unknown step';
const repo = `${context.repo.owner}/${context.repo.repo}`;
const jobUrl = process.env.BENCH_JOB_URL || `${context.serverUrl}/${repo}/actions/runs/${context.runId}`;
const blocks = [
{
type: 'header',
text: { type: 'plain_text', text: ':rotating_light: Nightly Bench Failed', emoji: true },
},
{
type: 'section',
text: { type: 'mrkdwn', text: `*Nightly regression* failed while *${failedStep}*` },
},
{
type: 'actions',
elements: [{
type: 'button',
text: { type: 'plain_text', text: 'View Logs :github:', emoji: true },
url: jobUrl,
action_id: 'ci_button',
}],
},
];
await fetch('https://slack.com/api/chat.postMessage', {
method: 'POST',
headers: {
'Authorization': `Bearer ${token}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
channel,
blocks,
text: `Nightly bench failed while ${failedStep}`,
unfurl_links: false,
}),
});
- name: Restore system settings
if: always()
run: |
sudo systemctl start irqbalance cron atd 2>/dev/null || true
# ---------------------------------------------------------------------------
# Job 3: Save state on success
# ---------------------------------------------------------------------------
save-state:
needs: [resolve-refs, bench-scheduled]
if: success()
name: save-state
runs-on: ubuntu-latest
steps:
- name: Write state file
run: |
mkdir -p .nightly-state
echo "${{ needs.resolve-refs.outputs.feature-ref }}" > .nightly-state/last-feature-ref
- name: Save nightly state
uses: actions/cache/save@v4
with:
path: .nightly-state
key: bench-scheduled-state-${{ needs.resolve-refs.outputs.feature-ref }}

View File

@@ -71,6 +71,11 @@ on:
required: false
default: "true"
type: boolean
otlp:
description: "Export OTLP traces and logs"
required: false
default: "true"
type: boolean
env:
CARGO_TERM_COLOR: always
@@ -110,6 +115,7 @@ jobs:
baseline-args: ${{ steps.args.outputs.baseline-args }}
feature-args: ${{ steps.args.outputs.feature-args }}
abba: ${{ steps.args.outputs.abba }}
otlp: ${{ steps.args.outputs.otlp }}
comment-id: ${{ steps.ack.outputs.comment-id }}
steps:
- name: Check org membership
@@ -151,6 +157,7 @@ jobs:
bigBlocks = blocks === 'big' ? 'true' : 'false';
var rethNewPayload = '${{ github.event.inputs.reth_newPayload }}' !== 'false' ? 'true' : 'false';
var abba = '${{ github.event.inputs.abba }}' !== 'false' ? 'true' : 'false';
var otlp = '${{ github.event.inputs.otlp }}' !== 'false' ? 'true' : 'false';
var waitTime = '${{ github.event.inputs.wait_time }}' || '';
var baselineNodeArgs = '${{ github.event.inputs.baseline_args }}' || '';
var featureNodeArgs = '${{ github.event.inputs.feature_args }}' || '';
@@ -177,10 +184,10 @@ jobs:
const intOrKeywordArgs = new Map([['blocks', new Set(['big'])]]);
const refArgs = new Set(['baseline', 'feature']);
const boolArgs = new Set(['samply', 'no-slack']);
const boolDefaultTrue = new Set(['reth_newPayload', 'abba']);
const boolDefaultTrue = new Set(['reth_newPayload', 'abba', 'otlp']);
const durationArgs = new Set(['wait-time']);
const stringArgs = new Set(['baseline-args', 'feature-args']);
const defaults = { blocks: '500', warmup: '100', baseline: '', feature: '', samply: 'false', 'no-slack': 'false', cores: '0', reth_newPayload: 'true', abba: 'true', 'wait-time': '', 'baseline-args': '', 'feature-args': '' };
const defaults = { blocks: '500', warmup: '100', baseline: '', feature: '', samply: 'false', 'no-slack': 'false', cores: '0', reth_newPayload: 'true', abba: 'true', otlp: 'true', 'wait-time': '', 'baseline-args': '', 'feature-args': '' };
const unknown = [];
const invalid = [];
const args = body.replace(/^(?:@decofe|derek) bench\s*/, '');
@@ -250,7 +257,7 @@ jobs:
if (unknown.length) errors.push(`Unknown argument(s): \`${unknown.join('`, `')}\``);
if (invalid.length) errors.push(`Invalid value(s): ${invalid.join(', ')}`);
if (errors.length) {
const msg = `❌ **Invalid bench command**\n\n${errors.join('\n')}\n\n**Usage:** \`@decofe bench [blocks=N|big] [warmup=N] [baseline=REF] [feature=REF] [samply] [no-slack] [cores=N] [reth_newPayload=true|false] [abba=true|false] [wait-time=DURATION] [baseline-args="..."] [feature-args="..."]\``;
const msg = `❌ **Invalid bench command**\n\n${errors.join('\n')}\n\n**Usage:** \`@decofe bench [blocks=N|big] [warmup=N] [baseline=REF] [feature=REF] [samply] [no-slack] [cores=N] [reth_newPayload=true|false] [abba=true|false] [otlp=true|false] [wait-time=DURATION] [baseline-args="..."] [feature-args="..."]\``;
await github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
@@ -270,6 +277,7 @@ jobs:
bigBlocks = blocks === 'big' ? 'true' : 'false';
var rethNewPayload = defaults.reth_newPayload;
var abba = defaults.abba;
var otlp = defaults.otlp;
var waitTime = defaults['wait-time'];
var baselineNodeArgs = defaults['baseline-args'];
var featureNodeArgs = defaults['feature-args'];
@@ -308,6 +316,7 @@ jobs:
core.setOutput('baseline-args', baselineNodeArgs);
core.setOutput('feature-args', featureNodeArgs);
core.setOutput('abba', abba);
core.setOutput('otlp', otlp);
- name: Acknowledge request
id: ack
@@ -375,6 +384,8 @@ jobs:
const rethNPNote = !rethNP ? ', reth_newPayload: `disabled`' : '';
const abbaEnabled = '${{ steps.args.outputs.abba }}' !== 'false';
const abbaNote = !abbaEnabled ? ', abba: `disabled`' : '';
const otlpEnabled = '${{ steps.args.outputs.otlp }}' !== 'false';
const otlpNote = !otlpEnabled ? ', otlp: `disabled`' : '';
const waitTimeVal = '${{ steps.args.outputs.wait-time }}';
const waitTimeNote = waitTimeVal ? `, wait-time: \`${waitTimeVal}\`` : '';
const baselineArgsVal = '${{ steps.args.outputs.baseline-args }}';
@@ -382,7 +393,7 @@ jobs:
const featureArgsVal = '${{ steps.args.outputs.feature-args }}';
const featureArgsNote = featureArgsVal ? `, feature-args: \`${featureArgsVal}\`` : '';
const blocksDesc = bigBlocks ? 'blocks: `big`' : `${blocks} blocks, ${warmup} warmup blocks`;
const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${noSlackNote}${coresNote}${rethNPNote}${abbaNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`;
const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${noSlackNote}${coresNote}${rethNPNote}${abbaNote}${otlpNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`;
const { data: comment } = await github.rest.issues.createComment({
owner: context.repo.owner,
@@ -417,6 +428,8 @@ jobs:
const rethNPNote = !rethNP ? ', reth_newPayload: `disabled`' : '';
const abbaEnabled = '${{ steps.args.outputs.abba }}' !== 'false';
const abbaNote = !abbaEnabled ? ', abba: `disabled`' : '';
const otlpEnabled = '${{ steps.args.outputs.otlp }}' !== 'false';
const otlpNote = !otlpEnabled ? ', otlp: `disabled`' : '';
const waitTimeVal = '${{ steps.args.outputs.wait-time }}';
const waitTimeNote = waitTimeVal ? `, wait-time: \`${waitTimeVal}\`` : '';
const baselineArgsVal = '${{ steps.args.outputs.baseline-args }}';
@@ -424,7 +437,7 @@ jobs:
const featureArgsVal = '${{ steps.args.outputs.feature-args }}';
const featureArgsNote = featureArgsVal ? `, feature-args: \`${featureArgsVal}\`` : '';
const blocksDesc = bigBlocks ? 'blocks: `big`' : `${blocks} blocks, ${warmup} warmup blocks`;
const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${noSlackNote}${coresNote}${rethNPNote}${abbaNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`;
const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${noSlackNote}${coresNote}${rethNPNote}${abbaNote}${otlpNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`;
const runUrl = `${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/actions/runs/${context.runId}`;
const numRunners = parseInt(process.env.BENCH_RUNNERS) || 1;
@@ -476,7 +489,7 @@ jobs:
reth-bench:
needs: reth-bench-ack
name: reth-bench
runs-on: [self-hosted, Linux, X64]
runs-on: [self-hosted, Linux, X64, available]
timeout-minutes: 120
env:
BENCH_RPC_URL: https://ethereum.reth.rs/rpc
@@ -494,9 +507,12 @@ jobs:
BENCH_BASELINE_ARGS: ${{ needs.reth-bench-ack.outputs.baseline-args }}
BENCH_FEATURE_ARGS: ${{ needs.reth-bench-ack.outputs.feature-args }}
BENCH_ABBA: ${{ needs.reth-bench-ack.outputs.abba }}
BENCH_OTLP: ${{ needs.reth-bench-ack.outputs.otlp }}
BENCH_COMMENT_ID: ${{ needs.reth-bench-ack.outputs.comment-id }}
BENCH_NO_SLACK: ${{ needs.reth-bench-ack.outputs.no-slack }}
BENCH_METRICS_ADDR: "127.0.0.1:9100"
BENCH_OTLP_TRACES_ENDPOINT: ${{ needs.reth-bench-ack.outputs.otlp != 'false' && secrets.BENCH_OTLP_TRACES_ENDPOINT || '' }}
BENCH_OTLP_LOGS_ENDPOINT: ${{ needs.reth-bench-ack.outputs.otlp != 'false' && secrets.BENCH_OTLP_LOGS_ENDPOINT || '' }}
steps:
- name: Clean up previous bench-work
run: sudo rm -rf "$BENCH_WORK_DIR" 2>/dev/null || true
@@ -557,6 +573,8 @@ jobs:
const rethNPNote = !rethNP ? ', reth_newPayload: `disabled`' : '';
const abbaEnabled = (process.env.BENCH_ABBA || 'true') !== 'false';
const abbaNote = !abbaEnabled ? ', abba: `disabled`' : '';
const otlpEnabled = (process.env.BENCH_OTLP || 'true') !== 'false';
const otlpNote = !otlpEnabled ? ', otlp: `disabled`' : '';
const waitTimeVal = process.env.BENCH_WAIT_TIME || '';
const waitTimeNote = waitTimeVal ? `, wait-time: \`${waitTimeVal}\`` : '';
const baselineArgsVal = process.env.BENCH_BASELINE_ARGS || '';
@@ -564,7 +582,7 @@ jobs:
const featureArgsVal = process.env.BENCH_FEATURE_ARGS || '';
const featureArgsNote = featureArgsVal ? `, feature-args: \`${featureArgsVal}\`` : '';
const blocksDesc = bigBlocks ? 'blocks: `big`' : `${blocks} blocks, ${warmup} warmup blocks`;
core.exportVariable('BENCH_CONFIG', `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${noSlackNote}${coresNote}${rethNPNote}${abbaNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`);
core.exportVariable('BENCH_CONFIG', `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${noSlackNote}${coresNote}${rethNPNote}${abbaNote}${otlpNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`);
const { buildBody } = require('./.github/scripts/bench-update-status.js');
await github.rest.issues.updateComment({
@@ -878,6 +896,7 @@ jobs:
id: run-baseline-1
env:
BASELINE_REF: ${{ steps.refs.outputs.baseline-ref }}
OTEL_RESOURCE_ATTRIBUTES: "benchmark_id=${{ env.BENCH_ID }},benchmark_run=baseline-1,run_type=baseline,git_ref=${{ steps.refs.outputs.baseline-ref }}"
run: |
cat > "$BENCH_LABELS_FILE" <<LABELS
{"benchmark_run":"baseline-1","run_type":"baseline","git_ref":"${BASELINE_REF}","bench_sha":"${BASELINE_REF}","benchmark_id":"${BENCH_ID}","run_start_epoch":"$(date +%s)","reference_epoch":"${BENCH_REFERENCE_EPOCH}"}
@@ -888,6 +907,7 @@ jobs:
id: run-feature-1
env:
FEATURE_REF: ${{ steps.refs.outputs.feature-ref }}
OTEL_RESOURCE_ATTRIBUTES: "benchmark_id=${{ env.BENCH_ID }},benchmark_run=feature-1,run_type=feature,git_ref=${{ steps.refs.outputs.feature-ref }}"
run: |
cat > "$BENCH_LABELS_FILE" <<LABELS
{"benchmark_run":"feature-1","run_type":"feature","git_ref":"${FEATURE_REF}","bench_sha":"${FEATURE_REF}","benchmark_id":"${BENCH_ID}","run_start_epoch":"$(date +%s)","reference_epoch":"${BENCH_REFERENCE_EPOCH}"}
@@ -895,9 +915,11 @@ jobs:
taskset -c 0 .github/scripts/bench-reth-run.sh feature ../reth-feature/target/profiling/reth "$BENCH_WORK_DIR/feature-1"
- name: "Run benchmark: feature (2/2)"
if: env.BENCH_ABBA != 'false'
id: run-feature-2
env:
FEATURE_REF: ${{ steps.refs.outputs.feature-ref }}
OTEL_RESOURCE_ATTRIBUTES: "benchmark_id=${{ env.BENCH_ID }},benchmark_run=feature-2,run_type=feature,git_ref=${{ steps.refs.outputs.feature-ref }}"
run: |
cat > "$BENCH_LABELS_FILE" <<LABELS
{"benchmark_run":"feature-2","run_type":"feature","git_ref":"${FEATURE_REF}","bench_sha":"${FEATURE_REF}","benchmark_id":"${BENCH_ID}","run_start_epoch":"$(date +%s)","reference_epoch":"${BENCH_REFERENCE_EPOCH}"}
@@ -905,9 +927,11 @@ jobs:
taskset -c 0 .github/scripts/bench-reth-run.sh feature ../reth-feature/target/profiling/reth "$BENCH_WORK_DIR/feature-2"
- name: "Run benchmark: baseline (2/2)"
if: env.BENCH_ABBA != 'false'
id: run-baseline-2
env:
BASELINE_REF: ${{ steps.refs.outputs.baseline-ref }}
OTEL_RESOURCE_ATTRIBUTES: "benchmark_id=${{ env.BENCH_ID }},benchmark_run=baseline-2,run_type=baseline,git_ref=${{ steps.refs.outputs.baseline-ref }}"
run: |
LAST_RUN_START=$(date +%s)
echo "BENCH_LAST_RUN_START=${LAST_RUN_START}" >> "$GITHUB_ENV"
@@ -1187,11 +1211,22 @@ jobs:
comment_id: parseInt(ackCommentId),
body,
});
} else {
// No PR — write results to job summary
await core.summary.addRaw(body).write();
}
- name: Write job summary
if: success()
uses: actions/github-script@v8
with:
script: |
const jobSummary = require('./.github/scripts/bench-job-summary.js');
await jobSummary({
core,
context,
chartSha: '${{ steps.push-charts.outputs.sha }}',
grafanaUrl: '${{ steps.metrics.outputs.grafana-url }}',
runId: '${{ github.run_id }}',
});
- name: Send Slack notification (success)
if: success() && env.BENCH_NO_SLACK != 'true'
uses: actions/github-script@v8

View File

@@ -6,7 +6,7 @@ on:
hive_target:
required: true
type: string
description: "Docker bake target to build (e.g. hive-stable, hive-edge)"
description: "Docker bake target to build (e.g. hive)"
artifact_name:
required: false
type: string

View File

@@ -28,6 +28,11 @@ on:
required: false
type: boolean
default: false
pgo:
description: "Enable PGO profiling"
required: false
type: boolean
default: false
pgo_blocks:
description: "Number of blocks to execute for PGO profiling"
required: false
@@ -36,14 +41,14 @@ on:
jobs:
collect-pgo-profile:
if: github.repository == 'paradigmxyz/reth'
if: github.repository == 'paradigmxyz/reth' && github.event_name == 'workflow_dispatch' && inputs.pgo
uses: ./.github/workflows/pgo-profile.yml
with:
pgo_blocks: ${{ github.event_name == 'workflow_dispatch' && inputs.pgo_blocks || '20' }}
pgo_blocks: ${{ inputs.pgo_blocks || '20' }}
secrets: inherit
build:
if: github.repository == 'paradigmxyz/reth'
if: github.repository == 'paradigmxyz/reth' && !failure() && !cancelled()
name: Build Docker images
runs-on: ubuntu-24.04
needs: collect-pgo-profile
@@ -72,6 +77,7 @@ jobs:
echo "dirty=false" >> "$GITHUB_OUTPUT"
- name: Download pre-collected PGO profile
if: ${{ github.event_name == 'workflow_dispatch' && inputs.pgo }}
uses: actions/download-artifact@v7
with:
name: pgo-profdata
@@ -80,13 +86,19 @@ jobs:
- name: Configure PGO build args
id: pgo
run: |
if [ ! -f dist/merged.profdata ]; then
echo "::error::Expected dist/merged.profdata from collect-pgo-profile job"
exit 1
if [[ "${{ github.event_name }}" == "workflow_dispatch" ]] && [[ "${{ inputs.pgo }}" == "true" ]]; then
if [ ! -f dist/merged.profdata ]; then
echo "::error::Expected dist/merged.profdata from collect-pgo-profile job"
exit 1
fi
echo "use_pgo_bolt=true" >> "$GITHUB_OUTPUT"
echo "pgo_profdata=dist/merged.profdata" >> "$GITHUB_OUTPUT"
echo "Using pre-collected PGO profile from collect-pgo-profile job"
else
echo "use_pgo_bolt=false" >> "$GITHUB_OUTPUT"
echo "pgo_profdata=" >> "$GITHUB_OUTPUT"
echo "PGO disabled"
fi
echo "use_pgo_bolt=true" >> "$GITHUB_OUTPUT"
echo "pgo_profdata=dist/merged.profdata" >> "$GITHUB_OUTPUT"
echo "Using pre-collected PGO profile from collect-pgo-profile job"
- name: Determine build parameters
id: params

View File

@@ -63,6 +63,6 @@ jobs:
run: |
cargo nextest run \
--no-fail-fast \
--locked --features "edge" \
--locked \
-p reth-e2e-test-utils \
-E 'binary(rocksdb)'

View File

@@ -15,18 +15,11 @@ concurrency:
cancel-in-progress: true
jobs:
build-reth-stable:
build-reth:
uses: ./.github/workflows/docker-test.yml
with:
hive_target: hive-stable
artifact_name: "reth-stable"
secrets: inherit
build-reth-edge:
uses: ./.github/workflows/docker-test.yml
with:
hive_target: hive-edge
artifact_name: "reth-edge"
hive_target: hive
artifact_name: "reth"
secrets: inherit
prepare-hive:
@@ -84,7 +77,6 @@ jobs:
strategy:
fail-fast: false
matrix:
storage: [stable, edge]
# ethereum/rpc to be deprecated:
# https://github.com/ethereum/hive/pull/1117
scenario:
@@ -184,10 +176,9 @@ jobs:
- sim: ethereum/eels/consume-rlp
limit: .*tests/paris.*
needs:
- build-reth-stable
- build-reth-edge
- build-reth
- prepare-hive
name: ${{ matrix.storage }} / ${{ matrix.scenario.sim }}${{ matrix.scenario.limit && format(' - {0}', matrix.scenario.limit) }}
name: ${{ matrix.scenario.sim }}${{ matrix.scenario.limit && format(' - {0}', matrix.scenario.limit) }}
# Use larger runners for eels tests to avoid OOM runner crashes
runs-on: ${{ github.repository == 'paradigmxyz/reth' && (contains(matrix.scenario.sim, 'eels') && 'depot-ubuntu-latest-8' || 'depot-ubuntu-latest-4') || 'ubuntu-latest' }}
permissions:
@@ -206,7 +197,7 @@ jobs:
- name: Download reth image
uses: actions/download-artifact@v8
with:
name: reth-${{ matrix.storage }}
name: reth
path: /tmp
- name: Load Docker images

View File

@@ -22,7 +22,7 @@ concurrency:
jobs:
test:
name: test / ${{ matrix.network }} / ${{ matrix.storage }}
name: test / ${{ matrix.network }}
if: github.event_name != 'schedule'
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
env:
@@ -30,7 +30,6 @@ jobs:
strategy:
matrix:
network: ["ethereum"]
storage: ["stable", "edge"]
timeout-minutes: 60
steps:
- uses: actions/checkout@v6
@@ -47,7 +46,7 @@ jobs:
run: |
cargo nextest run \
--no-fail-fast \
--locked --features "asm-keccak ${{ matrix.network }} ${{ matrix.storage == 'edge' && 'edge' || '' }}" \
--locked --features "asm-keccak ${{ matrix.network }}" \
--workspace --exclude ef-tests \
-E "kind(test) and not binary(e2e_testsuite)"

View File

@@ -13,6 +13,10 @@ on:
description: "Enable dry run mode (builds artifacts but skips uploads and release creation)"
type: boolean
default: false
pgo:
description: "Enable PGO profiling"
type: boolean
default: false
pgo_blocks:
description: "Number of blocks to execute for PGO profiling on self-hosted runner"
type: string
@@ -154,12 +158,14 @@ jobs:
path: ${{ matrix.build.binary }}-${{ needs.extract-version.outputs.VERSION }}-${{ matrix.configs.target }}.tar.gz.asc
collect-pgo-profile:
if: github.event_name == 'workflow_dispatch' && inputs.pgo
uses: ./.github/workflows/pgo-profile.yml
with:
pgo_blocks: ${{ github.event_name == 'workflow_dispatch' && inputs.pgo_blocks || '20' }}
pgo_blocks: ${{ inputs.pgo_blocks || '20' }}
secrets: inherit
build-pgo:
if: github.event_name == 'workflow_dispatch' && inputs.pgo
name: build release (x86_64-linux PGO+BOLT)
runs-on: [self-hosted, Linux, X64]
needs: [extract-version, collect-pgo-profile]
@@ -237,7 +243,7 @@ jobs:
name: draft release
runs-on: ubuntu-latest
needs: [build, build-pgo, extract-version]
if: ${{ github.event.inputs.dry_run != 'true' }}
if: ${{ !failure() && !cancelled() && github.event.inputs.dry_run != 'true' }}
env:
VERSION: ${{ needs.extract-version.outputs.VERSION }}
permissions:

View File

@@ -19,15 +19,13 @@ concurrency:
jobs:
test:
name: test / ${{ matrix.type }} / ${{ matrix.storage }}
name: test / ${{ matrix.type }}
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
env:
RUST_BACKTRACE: 1
EDGE_FEATURES: ${{ matrix.storage == 'edge' && 'edge' || '' }}
strategy:
matrix:
type: [ethereum]
storage: [stable, edge]
include:
- type: ethereum
features: asm-keccak ethereum
@@ -50,14 +48,14 @@ jobs:
run: |
cargo nextest run \
--no-fail-fast \
--features "${{ matrix.features }} $EDGE_FEATURES" --locked \
--features "${{ matrix.features }}" --locked \
${{ matrix.exclude_args }} --workspace \
--exclude ef-tests --no-tests=warn \
-E "!kind(test) and not binary(e2e_testsuite)"
state:
name: Ethereum state tests
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-8' || 'ubuntu-latest' }}
env:
RUST_LOG: info,sync=error
RUST_BACKTRACE: 1

195
Cargo.lock generated
View File

@@ -1003,9 +1003,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299"
[[package]]
name = "anstream"
version = "0.6.21"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a"
checksum = "824a212faf96e9acacdbd09febd34438f8f711fb84e09a8916013cd7815ca28d"
dependencies = [
"anstyle",
"anstyle-parse",
@@ -1018,15 +1018,15 @@ dependencies = [
[[package]]
name = "anstyle"
version = "1.0.13"
version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78"
checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000"
[[package]]
name = "anstyle-parse"
version = "0.2.7"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2"
checksum = "52ce7f38b242319f7cabaa6813055467063ecdc9d355bbb4ce0c68908cd8130e"
dependencies = [
"utf8parse",
]
@@ -1037,7 +1037,7 @@ version = "1.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc"
dependencies = [
"windows-sys 0.61.2",
"windows-sys 0.60.2",
]
[[package]]
@@ -1048,7 +1048,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d"
dependencies = [
"anstyle",
"once_cell_polyfill",
"windows-sys 0.61.2",
"windows-sys 0.60.2",
]
[[package]]
@@ -2026,9 +2026,9 @@ dependencies = [
[[package]]
name = "c-kzg"
version = "2.1.6"
version = "2.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a0f582957c24870b7bfd12bf562c40b4734b533cafbaf8ded31d6d85f462c01"
checksum = "6648ed1e4ea8e8a1a4a2c78e1cda29a3fd500bc622899c340d8525ea9a76b24a"
dependencies = [
"arbitrary",
"blst",
@@ -2113,9 +2113,9 @@ dependencies = [
[[package]]
name = "cc"
version = "1.2.56"
version = "1.2.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aebf35691d1bfb0ac386a69bac2fde4dd276fb618cf8bf4f5318fe285e821bb2"
checksum = "7a0dd1ca384932ff3641c8718a02769f1698e7563dc6974ffd03346116310423"
dependencies = [
"find-msvc-tools",
"jobserver",
@@ -2214,9 +2214,9 @@ dependencies = [
[[package]]
name = "clap"
version = "4.5.60"
version = "4.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2797f34da339ce31042b27d23607e051786132987f595b02ba4f6a6dffb7030a"
checksum = "b193af5b67834b676abd72466a96c1024e6a6ad978a1f484bd90b85c94041351"
dependencies = [
"clap_builder",
"clap_derive",
@@ -2224,9 +2224,9 @@ dependencies = [
[[package]]
name = "clap_builder"
version = "4.5.60"
version = "4.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24a241312cea5059b13574bb9b3861cabf758b879c15190b37b6d6fd63ab6876"
checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f"
dependencies = [
"anstream",
"anstyle",
@@ -2236,9 +2236,9 @@ dependencies = [
[[package]]
name = "clap_derive"
version = "4.5.55"
version = "4.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a92793da1a46a5f2a02a6f4c46c6496b28c43638adea8306fcb0caa1634f24e5"
checksum = "1110bd8a634a1ab8cb04345d8d878267d57c3cf1b38d91b71af6686408bbca6a"
dependencies = [
"heck",
"proc-macro2",
@@ -2248,9 +2248,9 @@ dependencies = [
[[package]]
name = "clap_lex"
version = "1.0.0"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a822ea5bc7590f9d40f1ba12c0dc3c2760f3482c6984db1573ad11031420831"
checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9"
[[package]]
name = "cmake"
@@ -2272,9 +2272,9 @@ dependencies = [
[[package]]
name = "codspeed"
version = "4.3.0"
version = "4.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38c2eb3388ebe26b5a0ab6bf4969d9c4840143d7f6df07caa3cc851b0606cef6"
checksum = "b684e94583e85a5ca7e1a6454a89d76a5121240f2fb67eb564129d9bafdb9db0"
dependencies = [
"anyhow",
"cc",
@@ -2282,7 +2282,7 @@ dependencies = [
"getrandom 0.2.17",
"glob",
"libc",
"nix 0.30.1",
"nix",
"serde",
"serde_json",
"statrs",
@@ -2290,9 +2290,9 @@ dependencies = [
[[package]]
name = "codspeed-criterion-compat"
version = "4.3.0"
version = "4.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1e270597a1d1e183f86d1cc9f94f0133654ee3daf201c17903ee29363555dd7"
checksum = "2e65444156eb73ad7f57618188f8d4a281726d133ef55b96d1dcff89528609ab"
dependencies = [
"clap",
"codspeed",
@@ -2303,9 +2303,9 @@ dependencies = [
[[package]]
name = "codspeed-criterion-compat-walltime"
version = "4.3.0"
version = "4.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c2613d2fac930fe34456be76f9124ee0800bb9db2e7fd2d6c65b9ebe98a292"
checksum = "96389aaa4bbb872ea4924dc0335b2bb181bcf28d6eedbe8fea29afcc5bde36a6"
dependencies = [
"anes",
"cast",
@@ -2408,9 +2408,9 @@ dependencies = [
[[package]]
name = "colorchoice"
version = "1.0.4"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570"
[[package]]
name = "colored"
@@ -2782,7 +2782,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0b1fab2ae45819af2d0731d60f2afe17227ebb1a1538a236da84c93e9a60162"
dependencies = [
"dispatch2",
"nix 0.31.2",
"nix",
"windows-sys 0.61.2",
]
@@ -2970,7 +2970,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ab67060fc6b8ef687992d439ca0fa36e7ed17e9a0b16b25b601e8757df720de"
dependencies = [
"data-encoding",
"syn 2.0.117",
"syn 1.0.109",
]
[[package]]
@@ -3023,9 +3023,9 @@ dependencies = [
[[package]]
name = "derive-where"
version = "1.6.0"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef941ded77d15ca19b40374869ac6000af1c9f2a4c0f3d4c70926287e6364a8f"
checksum = "d08b3a0bcc0d079199cd476b2cae8435016ec11d1c0986c6901c5ac223041534"
dependencies = [
"proc-macro2",
"quote",
@@ -3472,7 +3472,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb"
dependencies = [
"libc",
"windows-sys 0.61.2",
"windows-sys 0.52.0",
]
[[package]]
@@ -4286,7 +4286,7 @@ dependencies = [
"libc",
"log",
"rustversion",
"windows-link 0.2.1",
"windows-link 0.1.3",
"windows-result 0.4.1",
]
@@ -4825,7 +4825,7 @@ dependencies = [
"libc",
"percent-encoding",
"pin-project-lite",
"socket2 0.6.3",
"socket2 0.5.10",
"tokio",
"tower-service",
"tracing",
@@ -5195,7 +5195,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46"
dependencies = [
"hermit-abi",
"libc",
"windows-sys 0.61.2",
"windows-sys 0.52.0",
]
[[package]]
@@ -5500,9 +5500,9 @@ dependencies = [
[[package]]
name = "kasuari"
version = "0.4.11"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fe90c1150662e858c7d5f945089b7517b0a80d8bf7ba4b1b5ffc984e7230a5b"
checksum = "bde5057d6143cc94e861d90f591b9303d6716c6b9602309150bd068853c10899"
dependencies = [
"hashbrown 0.16.1",
"portable-atomic",
@@ -5654,9 +5654,9 @@ dependencies = [
[[package]]
name = "libz-sys"
version = "1.1.24"
version = "1.1.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4735e9cbde5aac84a5ce588f6b23a90b9b0b528f6c5a8db8a4aff300463a0839"
checksum = "d52f4c29e2a68ac30c9087e1b772dc9f44a2b66ed44edf2266cf2be9b03dafc1"
dependencies = [
"cc",
"libc",
@@ -5733,7 +5733,7 @@ dependencies = [
"generator",
"scoped-tls",
"tracing",
"tracing-subscriber 0.3.22",
"tracing-subscriber 0.3.23",
]
[[package]]
@@ -5781,9 +5781,9 @@ dependencies = [
[[package]]
name = "lz4_flex"
version = "0.12.0"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab6473172471198271ff72e9379150e9dfd70d8e533e0752a27e515b48dd375e"
checksum = "98c23545df7ecf1b16c303910a69b079e8e251d60f7dd2cc9b4177f2afaf1746"
[[package]]
name = "mach2"
@@ -6086,18 +6086,6 @@ dependencies = [
"unsigned-varint",
]
[[package]]
name = "nix"
version = "0.30.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6"
dependencies = [
"bitflags 2.11.0",
"cfg-if",
"cfg_aliases",
"libc",
]
[[package]]
name = "nix"
version = "0.31.2"
@@ -6162,7 +6150,7 @@ version = "0.50.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5"
dependencies = [
"windows-sys 0.61.2",
"windows-sys 0.59.0",
]
[[package]]
@@ -6313,9 +6301,9 @@ dependencies = [
[[package]]
name = "objc2-core-foundation"
version = "0.3.1"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c10c2894a6fed806ade6027bcd50662746363a9589d3ec9d9bef30a4e4bc166"
checksum = "2a180dd8642fa45cdb7dd721cd4c11b1cadd4929ce112ebd8b9f5803cc79d536"
dependencies = [
"bitflags 2.11.0",
]
@@ -6328,9 +6316,9 @@ checksum = "ef25abbcd74fb2609453eb695bd2f860d389e457f67dc17cafc8b8cbc89d0c33"
[[package]]
name = "objc2-io-kit"
version = "0.3.1"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71c1c64d6120e51cd86033f67176b1cb66780c2efe34dec55176f77befd93c0a"
checksum = "33fafba39597d6dc1fb709123dfa8289d39406734be322956a69f0931c73bb15"
dependencies = [
"libc",
"objc2-core-foundation",
@@ -6347,9 +6335,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.21.3"
version = "1.21.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50"
dependencies = [
"critical-section",
"portable-atomic",
@@ -6518,7 +6506,7 @@ dependencies = [
"opentelemetry",
"tracing",
"tracing-core",
"tracing-subscriber 0.3.22",
"tracing-subscriber 0.3.23",
]
[[package]]
@@ -7148,7 +7136,7 @@ dependencies = [
"quinn-udp",
"rustc-hash",
"rustls",
"socket2 0.6.3",
"socket2 0.5.10",
"thiserror 2.0.18",
"tokio",
"tracing",
@@ -7185,7 +7173,7 @@ dependencies = [
"cfg_aliases",
"libc",
"once_cell",
"socket2 0.6.3",
"socket2 0.5.10",
"tracing",
"windows-sys 0.60.2",
]
@@ -7721,7 +7709,7 @@ dependencies = [
"csv",
"ctrlc",
"eyre",
"nix 0.31.2",
"nix",
"reth-chainspec",
"reth-cli-runner",
"reth-cli-util",
@@ -8739,6 +8727,7 @@ dependencies = [
"reth-basic-payload-builder",
"reth-chainspec",
"reth-consensus-common",
"reth-engine-tree",
"reth-errors",
"reth-ethereum-primitives",
"reth-evm",
@@ -8751,6 +8740,7 @@ dependencies = [
"reth-revm",
"reth-storage-api",
"reth-transaction-pool",
"reth-trie",
"revm",
"tracing",
]
@@ -9651,6 +9641,7 @@ version = "1.11.3"
dependencies = [
"alloy-consensus",
"alloy-eips",
"alloy-genesis",
"alloy-primitives",
"alloy-rpc-types-engine",
"assert_matches",
@@ -10434,7 +10425,7 @@ dependencies = [
"tracing-journald",
"tracing-logfmt",
"tracing-samply",
"tracing-subscriber 0.3.22",
"tracing-subscriber 0.3.23",
"tracing-tracy",
"tracy-client",
]
@@ -10452,7 +10443,7 @@ dependencies = [
"opentelemetry_sdk",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber 0.3.22",
"tracing-subscriber 0.3.23",
"url",
]
@@ -11116,7 +11107,7 @@ dependencies = [
"errno",
"libc",
"linux-raw-sys",
"windows-sys 0.61.2",
"windows-sys 0.52.0",
]
[[package]]
@@ -11235,9 +11226,9 @@ dependencies = [
[[package]]
name = "schannel"
version = "0.1.28"
version = "0.1.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "891d81b926048e76efe18581bf793546b4c0eaf8448d72be8de2bbee5fd166e1"
checksum = "91c1b7e4904c873ef0710c1f407dde2e6287de2bebc1bbbf7d430bb7cbffd939"
dependencies = [
"windows-sys 0.61.2",
]
@@ -11510,9 +11501,9 @@ dependencies = [
[[package]]
name = "serde_with"
version = "3.17.0"
version = "3.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "381b283ce7bc6b476d903296fb59d0d36633652b633b27f64db4fb46dcbfc3b9"
checksum = "dd5414fad8e6907dbdd5bc441a50ae8d6e26151a03b1de04d89a5576de61d01f"
dependencies = [
"base64 0.22.1",
"chrono",
@@ -11529,11 +11520,11 @@ dependencies = [
[[package]]
name = "serde_with_macros"
version = "3.17.0"
version = "3.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6d4e30573c8cb306ed6ab1dca8423eec9a463ea0e155f45399455e0368b27e0"
checksum = "d3db8978e608f1fe7357e211969fd9abdcae80bac1ba7a3369bb7eb6b404eb65"
dependencies = [
"darling 0.21.3",
"darling 0.23.0",
"proc-macro2",
"quote",
"syn 2.0.117",
@@ -11773,7 +11764,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e"
dependencies = [
"libc",
"windows-sys 0.61.2",
"windows-sys 0.60.2",
]
[[package]]
@@ -11922,9 +11913,9 @@ dependencies = [
[[package]]
name = "sysinfo"
version = "0.38.3"
version = "0.38.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d03c61d2a49c649a15c407338afe7accafde9dac869995dccb73e5f7ef7d9034"
checksum = "92ab6a2f8bfe508deb3c6406578252e491d299cbbf3bc0529ecc3313aee4a52f"
dependencies = [
"libc",
"memchr",
@@ -11965,15 +11956,15 @@ dependencies = [
[[package]]
name = "tempfile"
version = "3.26.0"
version = "3.27.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82a72c767771b47409d2345987fda8628641887d5466101319899796367354a0"
checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd"
dependencies = [
"fastrand 2.3.0",
"getrandom 0.4.2",
"once_cell",
"rustix",
"windows-sys 0.61.2",
"windows-sys 0.52.0",
]
[[package]]
@@ -12227,9 +12218,9 @@ dependencies = [
[[package]]
name = "tinyvec"
version = "1.10.0"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa5fdc3bce6191a1dbc8c02d5c8bffcf557bafa17c124c5264a458f1b0613fa"
checksum = "3e61e67053d25a4e82c844e8424039d9745781b3fc4f32b8d55ed50f5f667ef3"
dependencies = [
"tinyvec_macros",
]
@@ -12314,7 +12305,11 @@ checksum = "d25a406cddcc431a75d3d9afc6a7c0f7428d4891dd973e4d54c56b46127bf857"
dependencies = [
"futures-util",
"log",
"rustls",
"rustls-native-certs",
"rustls-pki-types",
"tokio",
"tokio-rustls",
"tungstenite 0.28.0",
]
@@ -12514,7 +12509,7 @@ dependencies = [
"crossbeam-channel",
"thiserror 2.0.18",
"time",
"tracing-subscriber 0.3.22",
"tracing-subscriber 0.3.23",
]
[[package]]
@@ -12545,7 +12540,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b1581020d7a273442f5b45074a6a57d5757ad0a47dac0e9f0bd57b81936f3db"
dependencies = [
"tracing",
"tracing-subscriber 0.3.22",
"tracing-subscriber 0.3.23",
]
[[package]]
@@ -12566,7 +12561,7 @@ checksum = "2d3a81ed245bfb62592b1e2bc153e77656d94ee6a0497683a65a12ccaf2438d0"
dependencies = [
"libc",
"tracing-core",
"tracing-subscriber 0.3.22",
"tracing-subscriber 0.3.23",
]
[[package]]
@@ -12589,7 +12584,7 @@ dependencies = [
"time",
"tracing",
"tracing-core",
"tracing-subscriber 0.3.22",
"tracing-subscriber 0.3.23",
]
[[package]]
@@ -12604,7 +12599,7 @@ dependencies = [
"tracing",
"tracing-core",
"tracing-log",
"tracing-subscriber 0.3.22",
"tracing-subscriber 0.3.23",
"web-time",
]
@@ -12621,7 +12616,7 @@ dependencies = [
"memmap2",
"smallvec",
"tracing-core",
"tracing-subscriber 0.3.22",
"tracing-subscriber 0.3.23",
]
[[package]]
@@ -12645,9 +12640,9 @@ dependencies = [
[[package]]
name = "tracing-subscriber"
version = "0.3.22"
version = "0.3.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e"
checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319"
dependencies = [
"matchers",
"nu-ansi-term",
@@ -12671,7 +12666,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0eaa1852afa96e0fe9e44caa53dc0bd2d9d05e0f2611ce09f97f8677af56e4ba"
dependencies = [
"tracing-core",
"tracing-subscriber 0.3.22",
"tracing-subscriber 0.3.23",
"tracy-client",
]
@@ -12769,6 +12764,8 @@ dependencies = [
"httparse",
"log",
"rand 0.9.2",
"rustls",
"rustls-pki-types",
"sha1",
"thiserror 2.0.18",
"utf-8",
@@ -13271,7 +13268,7 @@ version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22"
dependencies = [
"windows-sys 0.61.2",
"windows-sys 0.48.0",
]
[[package]]
@@ -13970,18 +13967,18 @@ dependencies = [
[[package]]
name = "zerocopy"
version = "0.8.41"
version = "0.8.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96e13bc581734df6250836c59a5f44f3c57db9f9acb9dc8e3eaabdaf6170254d"
checksum = "f2578b716f8a7a858b7f02d5bd870c14bf4ddbbcf3a4c05414ba6503640505e3"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.8.41"
version = "0.8.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3545ea9e86d12ab9bba9fcd99b54c1556fd3199007def5a03c375623d05fac1c"
checksum = "7e6cc098ea4d3bd6246687de65af3f920c430e236bee1e3bf2e441463f08a02f"
dependencies = [
"proc-macro2",
"quote",

View File

@@ -444,7 +444,6 @@ revm-state = { version = "10.0.0", default-features = false }
revm-primitives = { version = "22.1.0", default-features = false }
revm-interpreter = { version = "34.0.0", default-features = false }
revm-database-interface = { version = "10.0.0", default-features = false }
op-revm = { version = "17.0.0", default-features = false }
revm-inspectors = "0.36.0"
# eth

View File

@@ -89,7 +89,6 @@ default = [
"keccak-cache-global",
"asm-keccak",
"min-debug-logs",
"rocksdb",
]
otlp = [
@@ -125,6 +124,7 @@ jemalloc = [
"reth-node-core/jemalloc",
"reth-node-metrics/jemalloc",
"reth-ethereum-cli/jemalloc",
"reth-provider/jemalloc",
]
jemalloc-prof = [
"reth-cli-util/jemalloc",
@@ -191,8 +191,6 @@ min-trace-logs = [
]
trie-debug = ["reth-node-builder/trie-debug", "reth-node-core/trie-debug"]
rocksdb = ["reth-ethereum-cli/rocksdb", "reth-node-core/rocksdb"]
edge = ["rocksdb"]
[[bin]]
name = "reth"

View File

@@ -992,7 +992,7 @@ impl<N: NodePrimitives<SignedTx: SignedTransaction>> NewCanonicalChain<N> {
///
/// Returns the new tip for [`Self::Reorg`] and [`Self::Commit`] variants which commit at least
/// 1 new block.
pub fn tip(&self) -> &SealedBlock<N::Block> {
pub fn tip(&self) -> &RecoveredBlock<N::Block> {
match self {
Self::Commit { new } | Self::Reorg { new, .. } => {
new.last().expect("non empty blocks").recovered_block()

View File

@@ -36,15 +36,6 @@ pub use spec::{
DepositContract, ForkBaseFeeParams, DEV, HOLESKY, HOODI, MAINNET, SEPOLIA,
};
use reth_primitives_traits::sync::OnceLock;
/// Simple utility to create a thread-safe sync cell with a value set.
pub fn once_cell_set<T>(value: T) -> OnceLock<T> {
let once = OnceLock::new();
let _ = once.set(value);
once
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -110,7 +110,6 @@ reth-provider = { workspace = true, features = ["test-utils"] }
tempfile.workspace = true
[features]
default = []
arbitrary = [
"dep:proptest",
"dep:arbitrary",
@@ -135,6 +134,3 @@ arbitrary = [
"reth-primitives-traits/arbitrary",
"reth-ethereum-primitives/arbitrary",
]
rocksdb = ["reth-db-common/rocksdb", "reth-stages/rocksdb", "reth-provider/rocksdb", "reth-prune/rocksdb"]
edge = ["rocksdb"]

View File

@@ -187,7 +187,6 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
where
C: ChainSpecParser<ChainSpec = N::ChainSpec>,
{
let prune_modes = config.prune.segments.clone();
let factory = ProviderFactory::<NodeTypesWithDBAdapter<N, DatabaseEnv>>::new(
db,
self.chain.clone(),
@@ -195,7 +194,8 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
rocksdb_provider,
runtime,
)?
.with_prune_modes(prune_modes.clone());
.with_prune_modes(config.prune.segments.clone())
.with_minimum_pruning_distance(config.prune.minimum_pruning_distance);
// Check for consistency between database and static files.
if !access.is_read_only_inconsistent() &&
@@ -229,10 +229,13 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
NoopBodiesDownloader::default(),
NoopEvmConfig::<N::Evm>::default(),
config.stages.clone(),
prune_modes.clone(),
config.prune.segments.clone(),
None,
))
.build(factory.clone(), StaticFileProducer::new(factory.clone(), prune_modes));
.build(
factory.clone(),
StaticFileProducer::new(factory.clone(), config.prune.segments.clone()),
);
// Move all applicable data from database to static files.
pipeline.move_to_static_files()?;

View File

@@ -21,7 +21,6 @@ use std::{
};
use tracing::{info, warn};
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb;
/// Interval for logging progress during checksum computation.
@@ -73,7 +72,6 @@ enum Subcommand {
limit: Option<usize>,
},
/// Calculates the checksum of a RocksDB table
#[cfg(all(unix, feature = "rocksdb"))]
Rocksdb {
/// The RocksDB table
#[arg(value_enum)]
@@ -100,7 +98,6 @@ impl Command {
Subcommand::StaticFile { segment, start_block, end_block, limit } => {
checksum_static_file(tool, segment, start_block, end_block, limit)?;
}
#[cfg(all(unix, feature = "rocksdb"))]
Subcommand::Rocksdb { table, limit } => {
rocksdb::checksum_rocksdb(tool, table, limit)?;
}

View File

@@ -199,7 +199,7 @@ pub(crate) fn config_for_selections(
}
return Config {
prune: PruneConfig { block_interval: PruneConfig::default().block_interval, segments },
prune: PruneConfig { segments, ..Default::default() },
static_files,
..Default::default()
};

View File

@@ -19,11 +19,12 @@ use reth_node_api::BlockTy;
use reth_node_events::node::NodeEvent;
use reth_provider::{
providers::ProviderNodeTypes, BlockNumReader, HeaderProvider, ProviderError, ProviderFactory,
StageCheckpointReader,
RocksDBProviderFactory, StageCheckpointReader,
};
use reth_prune::PruneModes;
use reth_stages::{prelude::*, ControlFlow, Pipeline, StageId, StageSet};
use reth_static_file::StaticFileProducer;
use reth_storage_api::StorageSettingsCache;
use std::{path::Path, sync::Arc};
use tokio::sync::watch;
use tracing::{debug, error, info, warn};
@@ -108,7 +109,11 @@ where
let provider = provider_factory.provider()?;
let init_blocks = provider.tx_ref().entries::<tables::HeaderNumbers>()?;
let init_txns = provider.tx_ref().entries::<tables::TransactionHashNumbers>()?;
let init_txns = if provider_factory.cached_storage_settings().storage_v2 {
provider_factory.rocksdb_provider().iter::<tables::TransactionHashNumbers>()?.count()
} else {
provider.tx_ref().entries::<tables::TransactionHashNumbers>()?
};
drop(provider);
let mut total_decoded_blocks = 0;
@@ -215,8 +220,12 @@ where
let provider = provider_factory.provider()?;
let total_imported_blocks = provider.tx_ref().entries::<tables::HeaderNumbers>()? - init_blocks;
let total_imported_txns =
provider.tx_ref().entries::<tables::TransactionHashNumbers>()? - init_txns;
let current_txns = if provider_factory.cached_storage_settings().storage_v2 {
provider_factory.rocksdb_provider().iter::<tables::TransactionHashNumbers>()?.count()
} else {
provider.tx_ref().entries::<tables::TransactionHashNumbers>()?
};
let total_imported_txns = current_txns - init_txns;
let result = ImportResult {
total_decoded_blocks,

View File

@@ -193,11 +193,15 @@ impl<C: ChainSpecParser> DownloadArgs<C> {
let default_secret_key_path = data_dir.p2p_secret();
let p2p_secret_key = self.network.secret_key(default_secret_key_path)?;
let rlpx_socket = (self.network.addr, self.network.port).into();
let boot_nodes = self.chain.bootnodes().unwrap_or_default();
let boot_nodes = self
.network
.resolved_bootnodes()
.unwrap_or_else(|| self.chain.bootnodes().unwrap_or_default());
let net =
NetworkConfigBuilder::<N::NetworkPrimitives>::new(p2p_secret_key, Runtime::test())
.peer_config(config.peers_config_with_basic_nodes_from_file(None))
.sessions_config(config.sessions)
.external_ip_resolver(self.network.nat.clone())
.network_id(self.network.network_id)
.boot_nodes(boot_nodes.clone())

View File

@@ -12,7 +12,6 @@ use reth_node_metrics::{
server::{MetricServer, MetricServerConfig},
version::VersionInfo,
};
#[cfg(all(unix, feature = "rocksdb"))]
use reth_provider::RocksDBProviderFactory;
use reth_prune::PrunerBuilder;
use reth_static_file::StaticFileProducer;
@@ -122,7 +121,6 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> PruneComma
}
// Flush and compact RocksDB to reclaim disk space after pruning
#[cfg(all(unix, feature = "rocksdb"))]
{
info!(target: "reth::cli", "Flushing and compacting RocksDB...");
provider_factory.rocksdb_provider().flush_and_compact()?;

View File

@@ -1,6 +1,6 @@
//! Configuration files.
use reth_network_types::{PeersConfig, SessionsConfig};
use reth_prune_types::PruneModes;
use reth_prune_types::{PruneModes, MINIMUM_UNWIND_SAFE_DISTANCE};
use reth_stages_types::ExecutionStageThresholds;
use reth_static_file_types::{StaticFileMap, StaticFileSegment};
use std::{
@@ -540,11 +540,24 @@ pub struct PruneConfig {
/// Pruning configuration for every part of the data that can be pruned.
#[cfg_attr(feature = "serde", serde(alias = "parts"))]
pub segments: PruneModes,
/// Minimum distance from the tip required for pruning. Controls the safety margin for
/// reorgs and manual unwinds. Defaults to [`MINIMUM_UNWIND_SAFE_DISTANCE`].
#[cfg_attr(feature = "serde", serde(default = "default_minimum_pruning_distance"))]
pub minimum_pruning_distance: u64,
}
/// Returns the default minimum pruning distance.
const fn default_minimum_pruning_distance() -> u64 {
MINIMUM_UNWIND_SAFE_DISTANCE
}
impl Default for PruneConfig {
fn default() -> Self {
Self { block_interval: DEFAULT_BLOCK_INTERVAL, segments: PruneModes::default() }
Self {
block_interval: DEFAULT_BLOCK_INTERVAL,
segments: PruneModes::default(),
minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
}
}
}
@@ -577,6 +590,7 @@ impl PruneConfig {
bodies_history,
receipts_log_filter,
},
minimum_pruning_distance,
} = other;
// Merge block_interval, only update if it's the default interval
@@ -584,6 +598,11 @@ impl PruneConfig {
self.block_interval = block_interval;
}
// Merge minimum_pruning_distance, only update if it's the default
if self.minimum_pruning_distance == MINIMUM_UNWIND_SAFE_DISTANCE {
self.minimum_pruning_distance = minimum_pruning_distance;
}
// Merge the various segment prune modes
self.segments.sender_recovery = self.segments.sender_recovery.or(sender_recovery);
self.segments.transaction_lookup = self.segments.transaction_lookup.or(transaction_lookup);
@@ -623,7 +642,9 @@ mod tests {
use crate::PruneConfig;
use alloy_primitives::Address;
use reth_network_peers::TrustedPeer;
use reth_prune_types::{PruneMode, PruneModes, ReceiptsLogPruneConfig};
use reth_prune_types::{
PruneMode, PruneModes, ReceiptsLogPruneConfig, MINIMUM_UNWIND_SAFE_DISTANCE,
};
use std::{collections::BTreeMap, path::Path, str::FromStr, time::Duration};
fn with_tempdir(filename: &str, proc: fn(&std::path::Path)) {
@@ -1093,6 +1114,7 @@ receipts = { distance = 16384 }
fn test_prune_config_merge() {
let mut config1 = PruneConfig {
block_interval: 5,
minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
segments: PruneModes {
sender_recovery: Some(PruneMode::Full),
transaction_lookup: None,
@@ -1109,6 +1131,7 @@ receipts = { distance = 16384 }
let config2 = PruneConfig {
block_interval: 10,
minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
segments: PruneModes {
sender_recovery: Some(PruneMode::Distance(500)),
transaction_lookup: Some(PruneMode::Full),

View File

@@ -75,8 +75,3 @@ path = "tests/e2e-testsuite/main.rs"
[[test]]
name = "rocksdb"
path = "tests/rocksdb/main.rs"
required-features = ["rocksdb"]
[features]
rocksdb = ["reth-node-core/rocksdb", "reth-provider/rocksdb", "reth-cli-commands/rocksdb"]
edge = ["rocksdb"]

View File

@@ -96,6 +96,11 @@ where
self
}
/// Sets the pruning arguments for the test nodes.
pub fn with_pruning(self, pruning: reth_node_core::args::PruningArgs) -> Self {
self.with_node_config_modifier(move |config| config.with_pruning(pruning.clone()))
}
/// Enables v2 storage defaults (`--storage.v2`), routing tx hashes, history
/// indices, etc. to `RocksDB` and changesets/senders to static files.
pub fn with_storage_v2(self) -> Self {

View File

@@ -1,7 +1,5 @@
//! E2E tests for `RocksDB` provider functionality.
#![cfg(all(feature = "rocksdb", unix))]
use alloy_consensus::BlockHeader;
use alloy_primitives::B256;
use alloy_rpc_types_eth::{Transaction, TransactionReceipt};

View File

@@ -139,13 +139,6 @@ trie-debug = [
"reth-engine-primitives/trie-debug",
"dep:serde_json",
]
rocksdb = [
"reth-provider/rocksdb",
"reth-prune/rocksdb",
"reth-stages?/rocksdb",
"reth-e2e-test-utils/rocksdb",
]
edge = ["rocksdb"]
[[test]]
name = "e2e_testsuite"

View File

@@ -177,18 +177,25 @@ where
}
}
provider_rw.commit()?;
debug!(target: "engine::persistence", first=?first_block, last=?last_block, "Saved range of blocks");
// Run the pruner in a separate provider so it reads committed RocksDB state
// that includes the history entries written by save_blocks above.
//
// The pruner reads the indices from rocksdb, filters it, and writes to indices, so it
// must be able to read anything written by save_blocks.
if self.pruner.is_pruning_needed(last.number) {
debug!(target: "engine::persistence", block_num=?last.number, "Running pruner");
let prune_start = Instant::now();
let provider_rw = self.provider.database_provider_rw()?;
let _ = self.pruner.run_with_provider(&provider_rw, last.number)?;
provider_rw.commit()?;
debug!(target: "engine::persistence", tip=?last.number, "Finished pruning after saving blocks");
self.metrics.prune_before_duration_seconds.record(prune_start.elapsed());
}
provider_rw.commit()?;
}
debug!(target: "engine::persistence", first=?first_block, last=?last_block, "Saved range of blocks");
let elapsed = start_time.elapsed();
self.metrics.save_blocks_batch_size.record(block_count as f64);
self.metrics.save_blocks_duration_seconds.record(elapsed);
@@ -454,4 +461,52 @@ mod tests {
assert_eq!(last_hash, result.last_block.unwrap().hash);
}
}
/// Verifies that committing `save_blocks` history before running the pruner
/// prevents the pruner from overwriting new entries.
///
/// Previously, both `save_blocks` and the pruner pushed `RocksDB` batches before
/// a single commit. Both read committed state, so the pruner didn't see the
/// new entries and its batch overwrote them. The fix commits `save_blocks`
/// first, then runs the pruner against committed state in a separate provider.
#[test]
fn test_save_blocks_then_prune_preserves_new_history() {
use reth_db::{models::ShardedKey, tables, BlockNumberList};
use reth_provider::RocksDBProviderFactory;
reth_tracing::init_test_tracing();
let provider_factory = create_test_provider_factory();
let tracked_addr = alloy_primitives::Address::from([0xBE; 20]);
// Phase 1: Establish baseline history for blocks 0..20.
let rocksdb = provider_factory.rocksdb_provider();
{
let mut batch = rocksdb.batch();
let initial_blocks: Vec<u64> = (0..20).collect();
let shard = BlockNumberList::new_pre_sorted(initial_blocks.iter().copied());
batch
.put::<tables::AccountsHistory>(ShardedKey::new(tracked_addr, u64::MAX), &shard)
.unwrap();
batch.commit().unwrap();
}
// Phase 2: Simulate the fixed on_save_blocks flow.
// Step 1: save_blocks appends new entries 20..25 and commits immediately.
let mut batch1 = rocksdb.batch();
batch1.append_account_history_shard(tracked_addr, 20..25u64).unwrap();
batch1.commit().unwrap();
// Step 2: Pruner runs AFTER commit, so it reads state that includes 20..25.
// Prunes entries ≤ 14, leaving [15..25).
let mut batch2 = rocksdb.batch();
batch2.prune_account_history_to(tracked_addr, 14).unwrap();
batch2.commit().unwrap();
// Verify new entries survived pruning.
let shards = rocksdb.account_history_shards(tracked_addr).unwrap();
let entries: Vec<u64> = shards.iter().flat_map(|(_, list)| list.iter()).collect();
let expected: Vec<u64> = (15..25).collect();
assert_eq!(entries, expected, "new entries 20..25 must survive pruning");
}
}

View File

@@ -791,15 +791,10 @@ where
// If the canonical chain is ahead of the new chain,
// gather all blocks until new head number.
while current_canonical_number > current_number {
if let Some(block) = self.canonical_block_by_hash(old_hash)? {
old_hash = block.recovered_block().parent_hash();
old_chain.push(block);
current_canonical_number -= 1;
} else {
// This shouldn't happen as we're walking back the canonical chain
warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
return Ok(None)
}
let block = self.canonical_block_by_hash(old_hash)?;
old_hash = block.recovered_block().parent_hash();
old_chain.push(block);
current_canonical_number -= 1;
}
// Both new and old chain pointers are now at the same height.
@@ -808,14 +803,9 @@ where
// Walk both chains from specified hashes at same height until
// a common ancestor (fork block) is reached.
while old_hash != current_hash {
if let Some(block) = self.canonical_block_by_hash(old_hash)? {
old_hash = block.recovered_block().parent_hash();
old_chain.push(block);
} else {
// This shouldn't happen as we're walking back the canonical chain
warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
return Ok(None)
}
let block = self.canonical_block_by_hash(old_hash)?;
old_hash = block.recovered_block().parent_hash();
old_chain.push(block);
if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
{
@@ -942,36 +932,22 @@ where
let new_head_hash = canonical_header.hash();
let new_head_number = canonical_header.number();
// Try to load the canonical ancestor's block
match self.canonical_block_by_hash(new_head_hash)? {
Some(executed_block) => {
// Perform the reorg to properly handle the unwind
self.canonical_in_memory_state.update_chain(NewCanonicalChain::Reorg {
new: vec![executed_block],
old: old_blocks,
});
// Load the canonical ancestor's block
let executed_block = self.canonical_block_by_hash(new_head_hash)?;
// Perform the reorg to properly handle the unwind
self.canonical_in_memory_state
.update_chain(NewCanonicalChain::Reorg { new: vec![executed_block], old: old_blocks });
// CRITICAL: Update the canonical head after the reorg
// This ensures get_canonical_head() returns the correct block
self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
// CRITICAL: Update the canonical head after the reorg
// This ensures get_canonical_head() returns the correct block
self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
debug!(
target: "engine::tree",
block_number = new_head_number,
block_hash = ?new_head_hash,
"Successfully loaded canonical ancestor into memory via reorg"
);
}
None => {
// Fallback: update header only if block cannot be found
warn!(
target: "engine::tree",
block_hash = ?new_head_hash,
"Could not find canonical ancestor block, updating header only"
);
self.canonical_in_memory_state.set_canonical_head(canonical_header.clone());
}
}
debug!(
target: "engine::tree",
block_number = new_head_number,
block_hash = ?new_head_hash,
"Successfully loaded canonical ancestor into memory via reorg"
);
Ok(())
}
@@ -997,18 +973,17 @@ where
return Ok(());
}
// Try to load the block from storage
if let Some(executed_block) = self.canonical_block_by_hash(block_hash)? {
self.canonical_in_memory_state
.update_chain(NewCanonicalChain::Commit { new: vec![executed_block] });
// Load the block from storage
let executed_block = self.canonical_block_by_hash(block_hash)?;
self.canonical_in_memory_state
.update_chain(NewCanonicalChain::Commit { new: vec![executed_block] });
debug!(
target: "engine::tree",
block_number,
block_hash = ?block_hash,
"Added canonical block to in-memory state"
);
}
debug!(
target: "engine::tree",
block_number,
block_hash = ?block_hash,
"Added canonical block to in-memory state"
);
Ok(())
}
@@ -2029,11 +2004,11 @@ where
/// pruned for a given block, this operation will return an error. On archive nodes, it
/// can retrieve any block.
#[instrument(level = "debug", target = "engine::tree", skip(self))]
fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<ExecutedBlock<N>> {
trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
// check memory first
if let Some(block) = self.state.tree_state.executed_block_by_hash(hash) {
return Ok(Some(block.clone()))
return Ok(block.clone())
}
let (block, senders) = self
@@ -2075,11 +2050,22 @@ where
},
});
Ok(Some(ExecutedBlock::new(
Ok(ExecutedBlock::new(
Arc::new(RecoveredBlock::new_sealed(block, senders)),
execution_output,
trie_data,
)))
))
}
/// Returns `true` if a block with the given hash is known, either in memory or in the
/// database. This is a lightweight existence check that avoids constructing a full
/// [`SealedHeader`].
fn has_block_by_hash(&self, hash: B256) -> ProviderResult<bool> {
if self.state.tree_state.contains_hash(&hash) {
Ok(true)
} else {
self.provider.is_known(hash)
}
}
/// Return sealed block header from in-memory state or database by hash.
@@ -2126,7 +2112,7 @@ where
parent_hash: B256,
) -> ProviderResult<Option<B256>> {
// Check if parent exists in side chain or in canonical chain.
if self.sealed_header_by_hash(parent_hash)?.is_some() {
if self.has_block_by_hash(parent_hash)? {
return Ok(Some(parent_hash))
}
@@ -2140,7 +2126,7 @@ where
// If current_header is None, then the current_hash does not have an invalid
// ancestor in the cache, check its presence in blockchain tree
if current_block.is_none() && self.sealed_header_by_hash(current_hash)?.is_some() {
if current_block.is_none() && self.has_block_by_hash(current_hash)? {
return Ok(Some(current_hash))
}
}
@@ -2807,7 +2793,7 @@ where
debug!(target: "engine::tree", block=?block_num_hash, parent = ?block_id.parent, "Inserting new block into tree");
// Check if block already exists - first in memory, then DB only if it could be persisted
if self.state.tree_state.sealed_header_by_hash(&block_num_hash.hash).is_some() {
if self.state.tree_state.contains_hash(&block_num_hash.hash) {
convert_to_block(self, input)?;
return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid));
}

View File

@@ -55,12 +55,14 @@ use tracing::{debug, debug_span, instrument, warn, Span};
pub mod bal;
pub mod multiproof;
mod preserved_sparse_trie;
pub mod preserved_sparse_trie;
pub mod prewarm;
pub mod receipt_root_task;
pub mod sparse_trie;
use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
pub use preserved_sparse_trie::{
PreservedSparseTrie, PreservedTrieGuard, SharedPreservedSparseTrie, SparseTrie,
};
/// Default parallelism thresholds to use with the [`ParallelSparseTrie`].
///
@@ -1024,7 +1026,7 @@ impl PayloadExecutionCache {
/// - It exists and matches the requested parent hash
/// - No other tasks are currently using it (checked via Arc reference count)
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip(self))]
pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
pub fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
let start = Instant::now();
let mut cache = self.inner.write();

View File

@@ -7,25 +7,25 @@ use std::{sync::Arc, time::Instant};
use tracing::debug;
/// Type alias for the sparse trie type used in preservation.
pub(super) type SparseTrie = SparseStateTrie<ConfigurableSparseTrie, ConfigurableSparseTrie>;
pub type SparseTrie = SparseStateTrie<ConfigurableSparseTrie, ConfigurableSparseTrie>;
/// Shared handle to a preserved sparse trie that can be reused across payload validations.
///
/// This is stored in [`PayloadProcessor`](super::PayloadProcessor) and cloned to pass to
/// [`SparseTrieCacheTask`](super::sparse_trie::SparseTrieCacheTask) for trie reuse.
#[derive(Debug, Default, Clone)]
pub(super) struct SharedPreservedSparseTrie(Arc<Mutex<Option<PreservedSparseTrie>>>);
pub struct SharedPreservedSparseTrie(Arc<Mutex<Option<PreservedSparseTrie>>>);
impl SharedPreservedSparseTrie {
/// Takes the preserved trie if present, leaving `None` in its place.
pub(super) fn take(&self) -> Option<PreservedSparseTrie> {
pub fn take(&self) -> Option<PreservedSparseTrie> {
self.0.lock().take()
}
/// Acquires a guard that blocks `take()` until dropped.
/// Use this before sending the state root result to ensure the next block
/// waits for the trie to be stored.
pub(super) fn lock(&self) -> PreservedTrieGuard<'_> {
pub fn lock(&self) -> PreservedTrieGuard<'_> {
PreservedTrieGuard(self.0.lock())
}
@@ -36,7 +36,7 @@ impl SharedPreservedSparseTrie {
/// before starting payload processing.
///
/// Returns the time spent waiting for the lock.
pub(super) fn wait_for_availability(&self) -> std::time::Duration {
pub fn wait_for_availability(&self) -> std::time::Duration {
let start = Instant::now();
let _guard = self.0.lock();
let elapsed = start.elapsed();
@@ -53,11 +53,11 @@ impl SharedPreservedSparseTrie {
/// Guard that holds the lock on the preserved trie.
/// While held, `take()` will block. Call `store()` to save the trie before dropping.
pub(super) struct PreservedTrieGuard<'a>(parking_lot::MutexGuard<'a, Option<PreservedSparseTrie>>);
pub struct PreservedTrieGuard<'a>(parking_lot::MutexGuard<'a, Option<PreservedSparseTrie>>);
impl PreservedTrieGuard<'_> {
/// Stores a preserved trie for later reuse.
pub(super) fn store(&mut self, trie: PreservedSparseTrie) {
pub fn store(&mut self, trie: PreservedSparseTrie) {
self.0.replace(trie);
}
}
@@ -69,7 +69,7 @@ impl PreservedTrieGuard<'_> {
/// matches the anchor.
/// - **Cleared**: Trie data has been cleared but allocations are preserved for reuse.
#[derive(Debug)]
pub(super) enum PreservedSparseTrie {
pub enum PreservedSparseTrie {
/// Trie with a computed state root that can be reused for continuation payloads.
Anchored {
/// The sparse state trie (pruned after root computation).
@@ -90,12 +90,12 @@ impl PreservedSparseTrie {
///
/// The `state_root` is the computed state root from the trie, which becomes the
/// anchor for determining if subsequent payloads can reuse this trie.
pub(super) const fn anchored(trie: SparseTrie, state_root: B256) -> Self {
pub const fn anchored(trie: SparseTrie, state_root: B256) -> Self {
Self::Anchored { trie, state_root }
}
/// Creates a cleared preserved trie (allocations preserved, data cleared).
pub(super) const fn cleared(trie: SparseTrie) -> Self {
pub const fn cleared(trie: SparseTrie) -> Self {
Self::Cleared { trie }
}
@@ -104,7 +104,7 @@ impl PreservedSparseTrie {
/// If the preserved trie is anchored and the parent state root matches, the pruned
/// trie structure is reused directly. Otherwise, the trie is cleared but allocations
/// are preserved to reduce memory overhead.
pub(super) fn into_trie_for(self, parent_state_root: B256) -> SparseTrie {
pub fn into_trie_for(self, parent_state_root: B256) -> SparseTrie {
match self {
Self::Anchored { trie, state_root } if state_root == parent_state_root => {
debug!(

View File

@@ -314,6 +314,12 @@ where
}
self.dispatch_pending_targets();
// If there's still no pending updates spend some time pre-computing the account
// trie upper hashes
if self.proof_result_rx.is_empty() {
self.trie.calculate_subtries();
}
} else if self.updates.is_empty() || self.pending_updates > MAX_PENDING_UPDATES {
// If we don't have any pending updates OR we've accumulated a lot already, apply
// them to the trie,

View File

@@ -74,6 +74,11 @@ impl<N: NodePrimitives> TreeState<N> {
self.blocks_by_hash.get(&hash)
}
/// Returns `true` if a block with the given hash exists in memory.
pub fn contains_hash(&self, hash: &B256) -> bool {
self.blocks_by_hash.contains_key(hash)
}
/// Returns the sealed block header by hash.
pub fn sealed_header_by_hash(&self, hash: &B256) -> Option<SealedHeader<N::BlockHeader>> {
self.blocks_by_hash.get(hash).map(|b| b.sealed_block().sealed_header().clone())

View File

@@ -4,7 +4,6 @@ use eyre::{eyre, OptionExt};
use futures_util::{stream::StreamExt, Stream, TryStreamExt};
use reqwest::{Client, IntoUrl, Url};
use reth_era::common::file_ops::EraFileType;
use reth_fs_util::FsPathError;
use sha2::{Digest, Sha256};
use std::{future::Future, path::Path, str::FromStr};
use tokio::{
@@ -137,7 +136,7 @@ impl<Http: HttpClient + Clone> EraClient<Http> {
let Some(number) = self.file_name_to_number(name) &&
(number < index || number >= last)
{
remove_file_ignore_not_found(entry.path())?;
reth_fs_util::remove_file_if_exists(entry.path())?;
}
}
}
@@ -322,16 +321,6 @@ impl<Http: HttpClient + Clone> EraClient<Http> {
}
}
fn remove_file_ignore_not_found(path: impl AsRef<Path>) -> eyre::Result<()> {
match reth_fs_util::remove_file(path) {
Ok(()) => Ok(()),
Err(FsPathError::RemoveFile { source, .. }) if source.kind() == io::ErrorKind::NotFound => {
Ok(())
}
Err(err) => Err(err.into()),
}
}
async fn checksum(mut reader: impl AsyncRead + Unpin) -> eyre::Result<Vec<u8>> {
let mut hasher = Sha256::new();
@@ -378,25 +367,4 @@ mod tests {
assert_eq!(actual_number, expected_number);
}
#[test]
fn test_remove_file_ignore_not_found() {
let temp_dir = tempfile::tempdir().unwrap();
let path = temp_dir.path().join("missing.era1");
assert!(remove_file_ignore_not_found(&path).is_ok());
}
#[test]
fn test_remove_file_ignore_not_found_preserves_other_errors() {
let temp_dir = tempfile::tempdir().unwrap();
let path = temp_dir.path().join("dir");
std::fs::create_dir_all(&path).unwrap();
let err = remove_file_ignore_not_found(&path).unwrap_err();
assert!(matches!(
err.downcast_ref::<FsPathError>(),
Some(FsPathError::RemoveFile { source, .. }) if source.kind() != io::ErrorKind::NotFound
));
}
}

View File

@@ -36,8 +36,6 @@ tracing.workspace = true
tempfile.workspace = true
[features]
default = []
otlp = ["reth-tracing/otlp", "reth-node-core/otlp"]
otlp-logs = ["reth-tracing/otlp-logs", "reth-node-core/otlp-logs"]
@@ -89,6 +87,3 @@ min-trace-logs = [
"tracing/release_max_level_trace",
"reth-node-core/min-trace-logs",
]
rocksdb = ["reth-cli-commands/rocksdb"]
edge = ["rocksdb"]

View File

@@ -27,7 +27,9 @@ reth-evm.workspace = true
reth-evm-ethereum = { workspace = true, features = ["std"] }
reth-errors.workspace = true
reth-chainspec.workspace = true
reth-engine-tree.workspace = true
reth-payload-validator.workspace = true
reth-trie.workspace = true
# ethereum
alloy-rlp.workspace = true

View File

@@ -0,0 +1,593 @@
//! Prototype payload builder that uses engine-tree caches (execution cache, precompile cache,
//! sparse trie) to speed up block building.
//!
//! This is NOT wired into the node builder -- it exists to prototype the API surface needed
//! to share engine-tree caches with the payload builder.
use alloy_consensus::Transaction;
use alloy_primitives::U256;
use alloy_rlp::Encodable;
use reth_basic_payload_builder::{
is_better_payload, BuildArguments, BuildOutcome, MissingPayloadBehaviour, PayloadBuilder,
PayloadConfig,
};
use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks};
use reth_consensus_common::validation::MAX_RLP_BLOCK_SIZE;
use reth_engine_tree::tree::{
precompile_cache::{CachedPrecompile, PrecompileCacheMap},
CachedStateMetrics, CachedStateProvider, ExecutionCache, PayloadExecutionCache,
PreservedSparseTrie, SharedPreservedSparseTrie,
};
use reth_errors::{BlockExecutionError, BlockValidationError, ConsensusError};
use reth_ethereum_primitives::{EthPrimitives, TransactionSigned};
use reth_evm::{
execute::{BlockBuilder, BlockBuilderOutcome},
ConfigureEvm, Evm, NextBlockEnvAttributes, SpecFor,
};
use reth_evm_ethereum::EthEvmConfig;
use reth_payload_builder::{BlobSidecars, EthBuiltPayload, EthPayloadBuilderAttributes};
use reth_payload_builder_primitives::PayloadBuilderError;
use reth_payload_primitives::PayloadBuilderAttributes;
use reth_primitives_traits::transaction::error::InvalidTransactionError;
use reth_revm::{database::StateProviderDatabase, db::State};
use reth_storage_api::{StateProvider, StateProviderFactory};
use reth_trie::{updates::TrieUpdates, HashedPostState, HashedStorage, MultiProof, TrieInput};
use reth_transaction_pool::{
error::{Eip4844PoolTransactionError, InvalidPoolTransactionError},
BestTransactions, BestTransactionsAttributes, PoolTransaction, TransactionPool,
ValidPoolTransaction,
};
use revm::context_interface::Block as _;
use std::sync::Arc;
use tracing::{debug, trace, warn};
use crate::EthereumBuilderConfig;
/// Default cross-block cache size (256 MB) used when no engine cache is available.
const DEFAULT_CACHE_SIZE: usize = 256 * 1024 * 1024;
type BestTransactionsIter<Pool> = Box<
dyn BestTransactions<Item = Arc<ValidPoolTransaction<<Pool as TransactionPool>::Transaction>>>,
>;
/// Prototype Ethereum payload builder that leverages engine-tree caches.
///
/// Holds references to the three engine caches:
/// - **Execution cache**: warm account/storage/bytecode data from prior block execution
/// - **Precompile cache**: cached precompile results across blocks
/// - **Sparse trie**: preserved sparse trie for faster state root computation
#[derive(Debug, Clone)]
pub struct EthereumPayloadBuilder2<Pool, Client, EvmConfig = EthEvmConfig>
where
EvmConfig: ConfigureEvm,
{
/// Client providing access to node state.
client: Client,
/// Transaction pool.
pool: Pool,
/// The type responsible for creating the evm.
evm_config: EvmConfig,
/// Payload builder configuration.
builder_config: EthereumBuilderConfig,
/// Engine execution cache (Arc-backed, cheap to clone).
execution_cache: PayloadExecutionCache,
/// Engine precompile cache map (Arc-backed, cheap to clone).
precompile_cache_map: PrecompileCacheMap<SpecFor<EvmConfig>>,
/// Engine sparse trie (Arc-backed, cheap to clone).
sparse_trie: SharedPreservedSparseTrie,
}
impl<Pool, Client, EvmConfig> EthereumPayloadBuilder2<Pool, Client, EvmConfig>
where
EvmConfig: ConfigureEvm,
{
/// Creates a new `EthereumPayloadBuilder2`.
pub fn new(
client: Client,
pool: Pool,
evm_config: EvmConfig,
builder_config: EthereumBuilderConfig,
execution_cache: PayloadExecutionCache,
precompile_cache_map: PrecompileCacheMap<SpecFor<EvmConfig>>,
sparse_trie: SharedPreservedSparseTrie,
) -> Self {
Self {
client,
pool,
evm_config,
builder_config,
execution_cache,
precompile_cache_map,
sparse_trie,
}
}
}
impl<Pool, Client, EvmConfig> PayloadBuilder for EthereumPayloadBuilder2<Pool, Client, EvmConfig>
where
EvmConfig: ConfigureEvm<Primitives = EthPrimitives, NextBlockEnvCtx = NextBlockEnvAttributes>,
Client: StateProviderFactory + ChainSpecProvider<ChainSpec: EthereumHardforks> + Clone,
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TransactionSigned>>,
{
type Attributes = EthPayloadBuilderAttributes;
type BuiltPayload = EthBuiltPayload;
fn try_build(
&self,
args: BuildArguments<EthPayloadBuilderAttributes, EthBuiltPayload>,
) -> Result<BuildOutcome<EthBuiltPayload>, PayloadBuilderError> {
cached_ethereum_payload(
self.evm_config.clone(),
self.client.clone(),
self.pool.clone(),
self.builder_config.clone(),
args,
|attributes| self.pool.best_transactions_with_attributes(attributes),
&self.execution_cache,
&self.precompile_cache_map,
&self.sparse_trie,
)
}
fn on_missing_payload(
&self,
_args: BuildArguments<Self::Attributes, Self::BuiltPayload>,
) -> MissingPayloadBehaviour<Self::BuiltPayload> {
if self.builder_config.await_payload_on_missing {
MissingPayloadBehaviour::AwaitInProgress
} else {
MissingPayloadBehaviour::RaceEmptyPayload
}
}
fn build_empty_payload(
&self,
config: PayloadConfig<Self::Attributes>,
) -> Result<EthBuiltPayload, PayloadBuilderError> {
let args = BuildArguments::new(Default::default(), config, Default::default(), None);
cached_ethereum_payload(
self.evm_config.clone(),
self.client.clone(),
self.pool.clone(),
self.builder_config.clone(),
args,
|attributes| self.pool.best_transactions_with_attributes(attributes),
&self.execution_cache,
&self.precompile_cache_map,
&self.sparse_trie,
)?
.into_payload()
.ok_or_else(|| PayloadBuilderError::MissingPayload)
}
}
/// Constructs an Ethereum transaction payload using engine-tree caches.
///
/// This is identical to [`default_ethereum_payload`](crate::default_ethereum_payload) except:
///
/// **Phase 1** - Execution cache + precompile cache:
/// - Uses `CachedStateProvider` wrapping the state provider with the engine's execution cache
/// - Wraps EVM precompiles with `CachedPrecompile` using the engine's precompile cache
///
/// **Phase 2** - Sparse trie state root:
/// - Takes the preserved sparse trie before building
/// - Uses it to compute the state root instead of the slow `state_root_with_updates()`
#[allow(clippy::too_many_arguments)]
pub fn cached_ethereum_payload<EvmConfig, Client, Pool, F>(
evm_config: EvmConfig,
client: Client,
pool: Pool,
builder_config: EthereumBuilderConfig,
args: BuildArguments<EthPayloadBuilderAttributes, EthBuiltPayload>,
best_txs: F,
execution_cache: &PayloadExecutionCache,
precompile_cache_map: &PrecompileCacheMap<SpecFor<EvmConfig>>,
sparse_trie: &SharedPreservedSparseTrie,
) -> Result<BuildOutcome<EthBuiltPayload>, PayloadBuilderError>
where
EvmConfig: ConfigureEvm<Primitives = EthPrimitives, NextBlockEnvCtx = NextBlockEnvAttributes>,
Client: StateProviderFactory + ChainSpecProvider<ChainSpec: EthereumHardforks>,
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TransactionSigned>>,
F: FnOnce(BestTransactionsAttributes) -> BestTransactionsIter<Pool>,
{
let BuildArguments { mut cached_reads, config, cancel, best_payload } = args;
let PayloadConfig { parent_header, attributes } = config;
let state_provider = client.state_by_block_hash(parent_header.hash())?;
// --- Phase 1: Execution cache ---
// Try to get a warm cache from the engine's execution cache for the parent block.
// If unavailable, create a fresh (empty) cache — `CachedStateProvider` will still
// function correctly, just with no pre-warmed data.
let (caches, metrics) = if let Some(saved) = execution_cache.get_cache_for(parent_header.hash())
{
debug!(target: "payload_builder", "using engine execution cache for parent");
(saved.cache().clone(), saved.metrics().clone())
} else {
debug!(target: "payload_builder", "no engine execution cache available, using fresh cache");
(ExecutionCache::new(DEFAULT_CACHE_SIZE), CachedStateMetrics::zeroed())
};
let cached_state = CachedStateProvider::new(state_provider.as_ref(), caches, metrics);
let state = StateProviderDatabase::new(&cached_state);
let mut db =
State::builder().with_database(cached_reads.as_db_mut(state)).with_bundle_update().build();
let next_block_attrs = NextBlockEnvAttributes {
timestamp: attributes.timestamp(),
suggested_fee_recipient: attributes.suggested_fee_recipient(),
prev_randao: attributes.prev_randao(),
gas_limit: builder_config.gas_limit(parent_header.gas_limit),
parent_beacon_block_root: attributes.parent_beacon_block_root(),
withdrawals: Some(attributes.withdrawals().clone()),
extra_data: builder_config.extra_data,
};
// --- Phase 1: Precompile cache ---
// Get spec_id before creating the builder, to properly key cached precompile results.
let spec_id = *evm_config
.next_evm_env(&parent_header, &next_block_attrs)
.map_err(PayloadBuilderError::other)?
.spec_id();
let mut builder = evm_config
.builder_for_next_block(&mut db, &parent_header, next_block_attrs)
.map_err(PayloadBuilderError::other)?;
builder.evm_mut().precompiles_mut().map_cacheable_precompiles(|address, precompile| {
CachedPrecompile::wrap(
precompile,
precompile_cache_map.cache_for_address(*address),
spec_id,
None,
)
});
let chain_spec = client.chain_spec();
debug!(target: "payload_builder", id=%attributes.id, parent_header = ?parent_header.hash(), parent_number = parent_header.number, "building new payload (cached)");
let mut cumulative_gas_used = 0;
let block_gas_limit: u64 = builder.evm_mut().block().gas_limit();
let base_fee = builder.evm_mut().block().basefee();
let mut best_txs = best_txs(BestTransactionsAttributes::new(
base_fee,
builder.evm_mut().block().blob_gasprice().map(|gasprice| gasprice as u64),
));
let mut total_fees = U256::ZERO;
builder.apply_pre_execution_changes().map_err(|err| {
warn!(target: "payload_builder", %err, "failed to apply pre-execution changes");
PayloadBuilderError::Internal(err.into())
})?;
let mut blob_sidecars = BlobSidecars::Empty;
let mut block_blob_count = 0;
let mut block_transactions_rlp_length = 0;
let blob_params = chain_spec.blob_params_at_timestamp(attributes.timestamp);
let protocol_max_blob_count =
blob_params.as_ref().map(|params| params.max_blob_count).unwrap_or_else(Default::default);
let max_blob_count = builder_config
.max_blobs_per_block
.map(|user_limit| std::cmp::min(user_limit, protocol_max_blob_count).max(1))
.unwrap_or(protocol_max_blob_count);
let is_osaka = chain_spec.is_osaka_active_at_timestamp(attributes.timestamp);
let withdrawals_rlp_length = attributes.withdrawals().length();
// --- Transaction execution loop (identical to default_ethereum_payload) ---
while let Some(pool_tx) = best_txs.next() {
if cumulative_gas_used + pool_tx.gas_limit() > block_gas_limit {
best_txs.mark_invalid(
&pool_tx,
&InvalidPoolTransactionError::ExceedsGasLimit(pool_tx.gas_limit(), block_gas_limit),
);
continue
}
if cancel.is_cancelled() {
return Ok(BuildOutcome::Cancelled)
}
let tx = pool_tx.to_consensus();
let tx_rlp_len = tx.inner().length();
let estimated_block_size_with_tx =
block_transactions_rlp_length + tx_rlp_len + withdrawals_rlp_length + 1024;
if is_osaka && estimated_block_size_with_tx > MAX_RLP_BLOCK_SIZE {
best_txs.mark_invalid(
&pool_tx,
&InvalidPoolTransactionError::OversizedData {
size: estimated_block_size_with_tx,
limit: MAX_RLP_BLOCK_SIZE,
},
);
continue
}
let mut blob_tx_sidecar = None;
if let Some(blob_hashes) = tx.blob_versioned_hashes() {
let tx_blob_count = blob_hashes.len() as u64;
if block_blob_count + tx_blob_count > max_blob_count {
trace!(target: "payload_builder", tx=?tx.hash(), ?block_blob_count, "skipping blob transaction because it would exceed the max blob count per block");
best_txs.mark_invalid(
&pool_tx,
&InvalidPoolTransactionError::Eip4844(
Eip4844PoolTransactionError::TooManyEip4844Blobs {
have: block_blob_count + tx_blob_count,
permitted: max_blob_count,
},
),
);
continue
}
let blob_sidecar_result = 'sidecar: {
let Some(sidecar) =
pool.get_blob(*tx.hash()).map_err(PayloadBuilderError::other)?
else {
break 'sidecar Err(Eip4844PoolTransactionError::MissingEip4844BlobSidecar)
};
if is_osaka {
if sidecar.is_eip7594() {
Ok(sidecar)
} else {
Err(Eip4844PoolTransactionError::UnexpectedEip4844SidecarAfterOsaka)
}
} else if sidecar.is_eip4844() {
Ok(sidecar)
} else {
Err(Eip4844PoolTransactionError::UnexpectedEip7594SidecarBeforeOsaka)
}
};
blob_tx_sidecar = match blob_sidecar_result {
Ok(sidecar) => Some(sidecar),
Err(error) => {
best_txs.mark_invalid(&pool_tx, &InvalidPoolTransactionError::Eip4844(error));
continue
}
};
}
let gas_used = match builder.execute_transaction(tx.clone()) {
Ok(gas_used) => gas_used,
Err(BlockExecutionError::Validation(BlockValidationError::InvalidTx {
error, ..
})) => {
if error.is_nonce_too_low() {
trace!(target: "payload_builder", %error, ?tx, "skipping nonce too low transaction");
} else {
trace!(target: "payload_builder", %error, ?tx, "skipping invalid transaction and its descendants");
best_txs.mark_invalid(
&pool_tx,
&InvalidPoolTransactionError::Consensus(
InvalidTransactionError::TxTypeNotSupported,
),
);
}
continue
}
Err(err) => return Err(PayloadBuilderError::evm(err)),
};
if let Some(blob_hashes) = tx.blob_versioned_hashes() {
block_blob_count += blob_hashes.len() as u64;
if block_blob_count == max_blob_count {
best_txs.skip_blobs();
}
}
block_transactions_rlp_length += tx_rlp_len;
let miner_fee =
tx.effective_tip_per_gas(base_fee).expect("fee is always valid; execution succeeded");
total_fees += U256::from(miner_fee) * U256::from(gas_used);
cumulative_gas_used += gas_used;
if let Some(sidecar) = blob_tx_sidecar {
blob_sidecars.push_sidecar_variant(sidecar.as_ref().clone());
}
}
// check if we have a better block
if !is_better_payload(best_payload.as_ref(), total_fees) {
drop(builder);
return Ok(BuildOutcome::Aborted { fees: total_fees, cached_reads })
}
// --- Phase 2: Sparse trie state root ---
// Take the preserved sparse trie before finishing. The wrapper's
// `state_root_with_updates` will use it instead of the slow full trie computation.
let preserved = sparse_trie.take();
let wrapper = SparseTrieStateProvider { inner: state_provider.as_ref(), preserved };
let BlockBuilderOutcome { execution_result, block, .. } = builder.finish(wrapper)?;
let requests = chain_spec
.is_prague_active_at_timestamp(attributes.timestamp)
.then_some(execution_result.requests);
let sealed_block = Arc::new(block.into_sealed_block());
debug!(target: "payload_builder", id=%attributes.id, sealed_block_header = ?sealed_block.sealed_header(), "sealed built block (cached)");
if is_osaka && sealed_block.rlp_length() > MAX_RLP_BLOCK_SIZE {
return Err(PayloadBuilderError::other(ConsensusError::BlockTooLarge {
rlp_length: sealed_block.rlp_length(),
max_rlp_length: MAX_RLP_BLOCK_SIZE,
}));
}
let payload = EthBuiltPayload::new(attributes.id, sealed_block, total_fees, requests)
.with_sidecars(blob_sidecars);
Ok(BuildOutcome::Better { payload, cached_reads })
}
/// A state provider wrapper that holds a preserved sparse trie for
/// faster state root computation.
///
/// All `StateProvider` trait methods delegate to the inner provider. The
/// `state_root_with_updates` method is the hook point for sparse trie integration.
struct SparseTrieStateProvider<'a> {
inner: &'a dyn StateProvider,
/// The preserved sparse trie, taken from the shared handle before building.
#[allow(dead_code)]
preserved: Option<PreservedSparseTrie>,
}
// --- Delegate all StateProvider trait methods to inner ---
impl reth_storage_api::AccountReader for SparseTrieStateProvider<'_> {
fn basic_account(
&self,
address: &alloy_primitives::Address,
) -> reth_errors::ProviderResult<Option<reth_primitives_traits::Account>> {
self.inner.basic_account(address)
}
}
impl reth_storage_api::BlockHashReader for SparseTrieStateProvider<'_> {
fn block_hash(
&self,
number: alloy_primitives::BlockNumber,
) -> reth_errors::ProviderResult<Option<alloy_primitives::B256>> {
self.inner.block_hash(number)
}
fn canonical_hashes_range(
&self,
start: alloy_primitives::BlockNumber,
end: alloy_primitives::BlockNumber,
) -> reth_errors::ProviderResult<Vec<alloy_primitives::B256>> {
self.inner.canonical_hashes_range(start, end)
}
}
impl reth_storage_api::BytecodeReader for SparseTrieStateProvider<'_> {
fn bytecode_by_hash(
&self,
code_hash: &alloy_primitives::B256,
) -> reth_errors::ProviderResult<Option<reth_primitives_traits::Bytecode>> {
self.inner.bytecode_by_hash(code_hash)
}
}
impl reth_storage_api::StateRootProvider for SparseTrieStateProvider<'_> {
fn state_root(
&self,
hashed_state: HashedPostState,
) -> reth_errors::ProviderResult<alloy_primitives::B256> {
self.inner.state_root(hashed_state)
}
fn state_root_from_nodes(
&self,
input: TrieInput,
) -> reth_errors::ProviderResult<alloy_primitives::B256> {
self.inner.state_root_from_nodes(input)
}
fn state_root_with_updates(
&self,
hashed_state: HashedPostState,
) -> reth_errors::ProviderResult<(alloy_primitives::B256, TrieUpdates)> {
// Phase 2: Hook point for sparse trie state root computation.
//
// Currently falls through to the standard computation. The sparse trie integration
// requires the multiproof pipeline which is complex to wire synchronously.
//
// Future implementation:
// 1. let targets = hashed_state.multi_proof_targets();
// 2. let multiproof = self.inner.multiproof(TrieInput::default(), targets)?;
// 3. sparse_trie.reveal_multiproof(multiproof)?;
// 4. // update account/storage leaves from hashed_state
// 5. let (root, updates) = sparse_trie.root_with_updates(provider)?;
// 6. return Ok((root, updates));
self.inner.state_root_with_updates(hashed_state)
}
fn state_root_from_nodes_with_updates(
&self,
input: TrieInput,
) -> reth_errors::ProviderResult<(alloy_primitives::B256, TrieUpdates)> {
self.inner.state_root_from_nodes_with_updates(input)
}
}
impl reth_storage_api::StorageRootProvider for SparseTrieStateProvider<'_> {
fn storage_root(
&self,
address: alloy_primitives::Address,
hashed_storage: HashedStorage,
) -> reth_errors::ProviderResult<alloy_primitives::B256> {
self.inner.storage_root(address, hashed_storage)
}
fn storage_proof(
&self,
address: alloy_primitives::Address,
slot: alloy_primitives::B256,
hashed_storage: HashedStorage,
) -> reth_errors::ProviderResult<reth_trie::StorageProof> {
self.inner.storage_proof(address, slot, hashed_storage)
}
fn storage_multiproof(
&self,
address: alloy_primitives::Address,
slots: &[alloy_primitives::B256],
hashed_storage: HashedStorage,
) -> reth_errors::ProviderResult<reth_trie::StorageMultiProof> {
self.inner.storage_multiproof(address, slots, hashed_storage)
}
}
impl reth_storage_api::StateProofProvider for SparseTrieStateProvider<'_> {
fn proof(
&self,
input: TrieInput,
address: alloy_primitives::Address,
slots: &[alloy_primitives::B256],
) -> reth_errors::ProviderResult<reth_trie::AccountProof> {
self.inner.proof(input, address, slots)
}
fn multiproof(
&self,
input: TrieInput,
targets: reth_trie::MultiProofTargets,
) -> reth_errors::ProviderResult<MultiProof> {
self.inner.multiproof(input, targets)
}
fn witness(
&self,
input: TrieInput,
target: HashedPostState,
) -> reth_errors::ProviderResult<Vec<alloy_primitives::Bytes>> {
self.inner.witness(input, target)
}
}
impl reth_storage_api::HashedPostStateProvider for SparseTrieStateProvider<'_> {
fn hashed_post_state(&self, bundle_state: &reth_revm::db::BundleState) -> HashedPostState {
self.inner.hashed_post_state(bundle_state)
}
}
impl StateProvider for SparseTrieStateProvider<'_> {
fn storage(
&self,
account: alloy_primitives::Address,
storage_key: alloy_primitives::StorageKey,
) -> reth_errors::ProviderResult<Option<alloy_primitives::StorageValue>> {
self.inner.storage(account, storage_key)
}
}

View File

@@ -40,6 +40,8 @@ use revm::context_interface::Block as _;
use std::sync::Arc;
use tracing::{debug, trace, warn};
pub mod builder2;
mod config;
pub use config::*;

View File

@@ -85,4 +85,7 @@ serde = [
"alloy-rpc-types-eth?/serde",
"rand/serde",
]
rpc = ["dep:alloy-rpc-types-eth"]
rpc = [
"dep:alloy-rpc-types-eth",
"alloy-rpc-types-eth?/serde",
]

View File

@@ -152,6 +152,7 @@ jemalloc = [
"reth-cli-util?/jemalloc",
"reth-ethereum-cli?/jemalloc",
"reth-node-core?/jemalloc",
"reth-provider?/jemalloc",
]
jemalloc-prof = [
"jemalloc",

View File

@@ -66,8 +66,6 @@ secp256k1.workspace = true
tempfile.workspace = true
[features]
default = []
edge = ["reth-provider/edge"]
serde = [
"reth-exex-types/serde",
"reth-revm/serde",

View File

@@ -260,6 +260,17 @@ pub fn remove_file(path: impl AsRef<Path>) -> Result<()> {
fs::remove_file(path).map_err(|err| FsPathError::remove_file(err, path))
}
/// Removes a file at the given path, ignoring the error if the file does not exist
/// (`ErrorKind::NotFound`).
pub fn remove_file_if_exists(path: impl AsRef<Path>) -> Result<()> {
match remove_file(&path) {
Err(FsPathError::RemoveFile { source, .. }) if source.kind() == io::ErrorKind::NotFound => {
Ok(())
}
result => result,
}
}
/// Wrapper for `std::fs::create_dir_all`
pub fn create_dir_all(path: impl AsRef<Path>) -> Result<()> {
let path = path.as_ref();

View File

@@ -63,6 +63,28 @@ impl EthStreamError {
}
}
/// Returns whether this error indicates a protocol breach on the receive side.
///
/// These are errors caused by the remote peer sending invalid or malformed data
/// that warrant disconnecting with [`DisconnectReason::ProtocolBreach`].
pub const fn is_protocol_breach(&self) -> bool {
matches!(
self,
Self::InvalidMessage(_) |
Self::MessageTooBig(_) |
Self::TransactionHashesInvalidLenOfFields { .. } |
Self::UnsupportedMessage { .. } |
Self::P2PStreamError(
P2PStreamError::Rlp(_) |
P2PStreamError::Snap(_) |
P2PStreamError::MessageTooBig { .. } |
P2PStreamError::UnknownReservedMessageId(_) |
P2PStreamError::EmptyProtocolMessage |
P2PStreamError::UnknownDisconnectReason(_)
)
)
}
/// Returns the [`io::Error`] if it was caused by IO
pub const fn as_io(&self) -> Option<&io::Error> {
if let Self::P2PStreamError(P2PStreamError::Io(io)) = self {

View File

@@ -21,7 +21,7 @@ alloy-eip2124.workspace = true
# misc
serde = { workspace = true, optional = true }
humantime-serde = { workspace = true, optional = true }
serde_json = { workspace = true, features = ["std"] }
serde_json = { workspace = true, features = ["std"], optional = true }
# misc
tracing.workspace = true
@@ -30,6 +30,7 @@ tracing.workspace = true
serde = [
"dep:serde",
"dep:humantime-serde",
"dep:serde_json",
"alloy-eip2124/serde",
]
test-utils = []

View File

@@ -1,15 +1,9 @@
//! Configuration for peering.
use std::{
collections::HashSet,
io::{self, ErrorKind},
path::Path,
time::Duration,
};
use std::{collections::HashSet, time::Duration};
use reth_net_banlist::{BanList, IpFilter};
use reth_network_peers::{NodeRecord, TrustedPeer};
use tracing::info;
use crate::{peers::PersistedPeerInfo, BackoffKind, ReputationChangeWeights};
@@ -311,16 +305,16 @@ impl PeersConfig {
#[cfg(feature = "serde")]
pub fn with_basic_nodes_from_file(
mut self,
optional_file: Option<impl AsRef<Path>>,
) -> Result<Self, io::Error> {
optional_file: Option<impl AsRef<std::path::Path>>,
) -> Result<Self, std::io::Error> {
let Some(file_path) = optional_file else { return Ok(self) };
let raw = match std::fs::read_to_string(file_path.as_ref()) {
Ok(contents) => contents,
Err(e) if e.kind() == ErrorKind::NotFound => return Ok(self),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(self),
Err(e) => return Err(e),
};
info!(target: "net::peers", file = %file_path.as_ref().display(), "Loading saved peers");
tracing::info!(target: "net::peers", file = %file_path.as_ref().display(), "Loading saved peers");
// Try the new format first, fall back to legacy Vec<NodeRecord>
let peers: Vec<PersistedPeerInfo> = serde_json::from_str(&raw)
@@ -330,9 +324,9 @@ impl PeersConfig {
nodes.into_iter().map(PersistedPeerInfo::from_node_record).collect(),
)
})
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
info!(target: "net::peers", count = peers.len(), "Loaded persisted peers");
tracing::info!(target: "net::peers", count = peers.len(), "Loaded persisted peers");
self.persisted_peers = peers;
Ok(self)
}

View File

@@ -742,7 +742,9 @@ impl<N: NetworkPrimitives> Future for ActiveSession<N> {
}
OnIncomingMessageOutcome::BadMessage { error, message } => {
debug!(target: "net::session", %error, msg=?message, remote_peer_id=?this.remote_peer_id, "received invalid protocol message");
return this.close_on_error(error, cx)
this.on_bad_message();
return this
.try_disconnect(DisconnectReason::ProtocolBreach, cx)
}
OnIncomingMessageOutcome::NoCapacity(msg) => {
// failed to send due to lack of capacity
@@ -752,6 +754,10 @@ impl<N: NetworkPrimitives> Future for ActiveSession<N> {
}
Err(err) => {
debug!(target: "net::session", %err, remote_peer_id=?this.remote_peer_id, "failed to receive message");
if err.is_protocol_breach() {
this.on_bad_message();
return this.try_disconnect(DisconnectReason::ProtocolBreach, cx)
}
return this.close_on_error(err, cx)
}
}
@@ -966,6 +972,7 @@ mod tests {
GetBlockBodies, HelloMessageWithProtocols, P2PStream, StatusBuilder, UnauthedEthStream,
UnauthedP2PStream, UnifiedStatus,
};
use reth_eth_wire_types::{EthMessageID, RawCapabilityMessage};
use reth_ethereum_forks::EthereumHardfork;
use reth_network_peers::pk2id;
use reth_network_types::session::config::PROTOCOL_BREACH_REQUEST_TIMEOUT;
@@ -1161,6 +1168,40 @@ mod tests {
fut.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_invalid_message_disconnects_with_protocol_breach() {
let mut builder = SessionBuilder::default();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let local_addr = listener.local_addr().unwrap();
let fut = builder.with_client_stream(local_addr, async move |mut client_stream| {
client_stream
.start_send_raw(RawCapabilityMessage::eth(
EthMessageID::PooledTransactions,
vec![0xc0].into(),
))
.unwrap();
client_stream.flush().await.unwrap();
let msg = client_stream.next().await.unwrap().unwrap_err();
assert_eq!(msg.as_disconnected(), Some(DisconnectReason::ProtocolBreach));
});
let (tx, rx) = oneshot::channel();
tokio::task::spawn(async move {
let (incoming, _) = listener.accept().await.unwrap();
let session = builder.connect_incoming(incoming).await;
session.await;
tx.send(()).unwrap();
});
fut.await;
rx.await.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn handle_dropped_stream() {
let mut builder = SessionBuilder::default();

View File

@@ -442,8 +442,12 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
search_durations.find_idle_peer
);
// peer should always exist since `is_session_active` already checked
let Some(peer) = peers.get(&peer_id) else { return false };
// peer may have disconnected between idle check and here, re-buffer hashes so they
// aren't lost from the pending fetch cache
let Some(peer) = peers.get(&peer_id) else {
self.buffer_hashes(hashes_to_request, None);
return false
};
let conn_eth_version = peer.version;
// fill the request with more hashes pending fetch that have been announced by the peer.
@@ -1449,6 +1453,26 @@ mod test {
)
}
#[test]
fn on_fetch_pending_hashes_rebuffers_on_disconnected_peer() {
let tx_fetcher = &mut TransactionFetcher::default();
let peer_1 = PeerId::new([1; 64]);
let peer_2 = PeerId::new([2; 64]);
let hash_1 = B256::from_slice(&[1; 32]);
buffer_hash_to_tx_fetcher(tx_fetcher, hash_1, peer_1, 0, Some(128));
buffer_hash_to_tx_fetcher(tx_fetcher, hash_1, peer_2, 0, Some(128));
assert_eq!(tx_fetcher.num_pending_hashes(), 1);
// pass empty peers map — both peers are "disconnected"
let peers = HashMap::new();
tx_fetcher.on_fetch_pending_hashes(&peers, |_| true);
// hash should be re-buffered, not lost
assert_eq!(tx_fetcher.num_pending_hashes(), 1);
}
#[test]
fn verify_response_hashes() {
let input = hex!(

View File

@@ -460,13 +460,27 @@ impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
}
/// Handles a closed peer session, removing the peer from transaction-local tracking state.
fn on_peer_session_closed(&mut self, peer_id: &PeerId) {
if let Some(mut peer) = self.peers.remove(peer_id) {
self.policies.propagation_policy_mut().on_session_closed(&mut peer);
}
self.transaction_fetcher.remove_peer(peer_id);
}
/// Clear the transaction
fn on_good_import(&mut self, hash: TxHash) {
self.transactions_by_peers.remove(&hash);
}
/// Penalize the peers that intentionally sent the bad transaction, and cache it to avoid
/// fetching or importing it again.
/// Handles a failed transaction import.
///
/// Blob sidecar errors (e.g. invalid proof, missing sidecar) are penalized via
/// `report_peer_bad_transactions` but NOT cached in `bad_imports` — the transaction itself
/// may be valid when fetched from another peer with correct sidecar data.
///
/// Other bad transactions are penalized and cached in `bad_imports` to avoid fetching or
/// importing them again.
///
/// Errors that count as bad transactions are:
///
@@ -491,6 +505,18 @@ impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
fn on_bad_import(&mut self, err: PoolError) {
let peers = self.transactions_by_peers.remove(&err.hash);
if err.is_bad_blob_sidecar() {
// Blob sidecar errors: penalize but do NOT cache the hash as bad.
// The transaction may be valid — only the sidecar from this peer was wrong.
// Using regular penalties means repeated offenders still get disconnected.
if let Some(peers) = peers {
for peer_id in peers {
self.report_peer_bad_transactions(peer_id);
}
}
return
}
// if we're _currently_ syncing, we ignore a bad transaction
if !err.is_bad_transaction() || self.network.is_syncing() {
return
@@ -1246,13 +1272,7 @@ where
fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
match event_result {
NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
// remove the peer
let peer = self.peers.remove(&peer_id);
if let Some(mut peer) = peer {
self.policies.propagation_policy_mut().on_session_closed(&mut peer);
}
self.transaction_fetcher.remove_peer(&peer_id);
self.on_peer_session_closed(&peer_id);
}
NetworkEvent::ActivePeerSession { info, messages } => {
// process active peer session and broadcast available transaction from the pool
@@ -2167,7 +2187,8 @@ mod tests {
NetworkConfigBuilder, NetworkManager,
};
use alloy_consensus::{TxEip1559, TxLegacy};
use alloy_primitives::{hex, Signature, TxKind, U256};
use alloy_eips::eip4844::BlobTransactionValidationError;
use alloy_primitives::{hex, Signature, TxKind, B256, U256};
use alloy_rlp::Decodable;
use futures::FutureExt;
use reth_chainspec::MIN_TRANSACTION_GAS;
@@ -2179,11 +2200,13 @@ mod tests {
};
use reth_storage_api::noop::NoopProvider;
use reth_tasks::Runtime;
use reth_transaction_pool::test_utils::{
testing_pool, MockTransaction, MockTransactionFactory, TestPool,
use reth_transaction_pool::{
error::{Eip4844PoolTransactionError, InvalidPoolTransactionError, PoolError},
test_utils::{testing_pool, MockTransaction, MockTransactionFactory, TestPool},
};
use secp256k1::SecretKey;
use std::{
collections::HashSet,
future::poll_fn,
net::{IpAddr, Ipv4Addr, SocketAddr},
str::FromStr,
@@ -2511,6 +2534,107 @@ mod tests {
handle.terminate().await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_session_closed_cleans_transaction_peer_state() {
let (mut tx_manager, _network) = new_tx_manager().await;
let peer_id = PeerId::new([1; 64]);
let fallback_peer = PeerId::new([2; 64]);
let (peer, _) = new_mock_session(peer_id, EthVersion::Eth66);
let hash_shared = B256::from_slice(&[1; 32]);
tx_manager.peers.insert(peer_id, peer);
buffer_hash_to_tx_fetcher(
&mut tx_manager.transaction_fetcher,
hash_shared,
peer_id,
0,
None,
);
buffer_hash_to_tx_fetcher(
&mut tx_manager.transaction_fetcher,
hash_shared,
fallback_peer,
0,
None,
);
tx_manager.transaction_fetcher.active_peers.insert(peer_id, 1);
tx_manager.on_network_event(NetworkEvent::Peer(PeerEvent::SessionClosed {
peer_id,
reason: None,
}));
// peer removed from peers map and active_peers
assert!(!tx_manager.peers.contains_key(&peer_id));
assert!(tx_manager.transaction_fetcher.active_peers.peek(&peer_id).is_none());
// fallback peer is still available for the hash
assert_eq!(
tx_manager.transaction_fetcher.get_idle_peer_for(hash_shared),
Some(&fallback_peer)
);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_bad_blob_sidecar_not_cached_as_bad_import() {
let (mut tx_manager, _network) = new_tx_manager().await;
let peer_id = PeerId::new([1; 64]);
let hash = B256::from_slice(&[1; 32]);
tx_manager.network.update_sync_state(SyncState::Idle);
tx_manager.transactions_by_peers.insert(hash, HashSet::from([peer_id]));
let err = PoolError::new(
hash,
InvalidPoolTransactionError::Eip4844(Eip4844PoolTransactionError::InvalidEip4844Blob(
BlobTransactionValidationError::InvalidProof,
)),
);
tx_manager.on_bad_import(err);
assert!(!tx_manager.bad_imports.contains(&hash));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_missing_blob_sidecar_not_cached_as_bad_import() {
let (mut tx_manager, _network) = new_tx_manager().await;
let peer_id = PeerId::new([1; 64]);
let hash = B256::from_slice(&[3; 32]);
tx_manager.network.update_sync_state(SyncState::Idle);
tx_manager.transactions_by_peers.insert(hash, HashSet::from([peer_id]));
let err = PoolError::new(
hash,
InvalidPoolTransactionError::Eip4844(
Eip4844PoolTransactionError::MissingEip4844BlobSidecar,
),
);
tx_manager.on_bad_import(err);
assert!(!tx_manager.bad_imports.contains(&hash));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_non_blob_sidecar_error_still_cached_as_bad_import() {
let (mut tx_manager, _network) = new_tx_manager().await;
let peer_id = PeerId::new([1; 64]);
let hash = B256::from_slice(&[2; 32]);
tx_manager.network.update_sync_state(SyncState::Idle);
tx_manager.transactions_by_peers.insert(hash, HashSet::from([peer_id]));
let err = PoolError::new(
hash,
InvalidPoolTransactionError::Eip4844(Eip4844PoolTransactionError::NoEip4844Blobs),
);
tx_manager.on_bad_import(err);
assert!(tx_manager.bad_imports.contains(&hash));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_on_get_pooled_transactions_network() {
reth_tracing::init_test_tracing();

View File

@@ -32,7 +32,7 @@ use reth_node_core::{
primitives::Head,
};
use reth_provider::{
providers::{BlockchainProvider, NodeTypesForProvider},
providers::{BlockchainProvider, NodeTypesForProvider, RocksDBProvider},
ChainSpecProvider, FullProvider,
};
use reth_tasks::TaskExecutor;
@@ -154,12 +154,14 @@ pub struct NodeBuilder<DB, ChainSpec> {
config: NodeConfig<ChainSpec>,
/// The configured database for the node.
database: DB,
/// An optional [`RocksDBProvider`] to use instead of creating one during launch.
rocksdb_provider: Option<RocksDBProvider>,
}
impl<ChainSpec> NodeBuilder<(), ChainSpec> {
/// Create a new [`NodeBuilder`].
pub const fn new(config: NodeConfig<ChainSpec>) -> Self {
Self { config, database: () }
Self { config, database: (), rocksdb_provider: None }
}
}
@@ -228,7 +230,13 @@ impl<DB, ChainSpec> NodeBuilder<DB, ChainSpec> {
impl<DB, ChainSpec: EthChainSpec> NodeBuilder<DB, ChainSpec> {
/// Configures the underlying database that the node will use.
pub fn with_database<D>(self, database: D) -> NodeBuilder<D, ChainSpec> {
NodeBuilder { config: self.config, database }
NodeBuilder { config: self.config, database, rocksdb_provider: self.rocksdb_provider }
}
/// Sets the [`RocksDBProvider`] to use instead of creating one during launch.
pub fn with_rocksdb_provider(mut self, rocksdb_provider: RocksDBProvider) -> Self {
self.rocksdb_provider = Some(rocksdb_provider);
self
}
/// Preconfigure the builder with the context to launch the node.
@@ -297,7 +305,7 @@ where
T: NodeTypesForProvider<ChainSpec = ChainSpec>,
P: FullProvider<NodeTypesWithDBAdapter<T, DB>>,
{
NodeBuilderWithTypes::new(self.config, self.database)
NodeBuilderWithTypes::new(self.config, self.database, self.rocksdb_provider)
}
/// Preconfigures the node with a specific node implementation.
@@ -347,6 +355,12 @@ where
DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
ChainSpec: EthChainSpec + EthereumHardforks,
{
/// Sets the [`RocksDBProvider`] to use instead of creating one during launch.
pub fn with_rocksdb_provider(mut self, rocksdb_provider: RocksDBProvider) -> Self {
self.builder.rocksdb_provider = Some(rocksdb_provider);
self
}
/// Configures the types of the node.
pub fn with_types<T>(self) -> WithLaunchContext<NodeBuilderWithTypes<RethFullAdapter<DB, T>>>
where

View File

@@ -16,6 +16,7 @@ use crate::{
use reth_exex::ExExContext;
use reth_node_api::{FullNodeComponents, FullNodeTypes, NodeAddOns, NodeTypes};
use reth_node_core::node_config::NodeConfig;
use reth_provider::providers::RocksDBProvider;
use reth_tasks::TaskExecutor;
use std::{fmt, fmt::Debug, future::Future};
@@ -25,6 +26,8 @@ pub struct NodeBuilderWithTypes<T: FullNodeTypes> {
config: NodeConfig<<T::Types as NodeTypes>::ChainSpec>,
/// The configured database for the node.
adapter: NodeTypesAdapter<T>,
/// An optional [`RocksDBProvider`] to use instead of creating one during launch.
rocksdb_provider: Option<RocksDBProvider>,
}
impl<T: FullNodeTypes> NodeBuilderWithTypes<T> {
@@ -32,8 +35,9 @@ impl<T: FullNodeTypes> NodeBuilderWithTypes<T> {
pub const fn new(
config: NodeConfig<<T::Types as NodeTypes>::ChainSpec>,
database: T::DB,
rocksdb_provider: Option<RocksDBProvider>,
) -> Self {
Self { config, adapter: NodeTypesAdapter::new(database) }
Self { config, adapter: NodeTypesAdapter::new(database), rocksdb_provider }
}
/// Advances the state of the node builder to the next state where all components are configured
@@ -41,11 +45,12 @@ impl<T: FullNodeTypes> NodeBuilderWithTypes<T> {
where
CB: NodeComponentsBuilder<T>,
{
let Self { config, adapter } = self;
let Self { config, adapter, rocksdb_provider } = self;
NodeBuilderWithComponents {
config,
adapter,
rocksdb_provider,
components_builder,
add_ons: AddOns { hooks: NodeHooks::default(), exexs: Vec::new(), add_ons: () },
}
@@ -150,6 +155,8 @@ pub struct NodeBuilderWithComponents<
pub config: NodeConfig<<T::Types as NodeTypes>::ChainSpec>,
/// Adapter for the underlying node types and database
pub adapter: NodeTypesAdapter<T>,
/// An optional [`RocksDBProvider`] to use instead of creating one during launch.
pub rocksdb_provider: Option<RocksDBProvider>,
/// container for type specific components
pub components_builder: CB,
/// Additional node extensions.
@@ -167,11 +174,12 @@ where
where
AO: NodeAddOns<NodeAdapter<T, CB::Components>>,
{
let Self { config, adapter, components_builder, .. } = self;
let Self { config, adapter, rocksdb_provider, components_builder, .. } = self;
NodeBuilderWithComponents {
config,
adapter,
rocksdb_provider,
components_builder,
add_ons: AddOns { hooks: NodeHooks::default(), exexs: Vec::new(), add_ons },
}

View File

@@ -472,6 +472,7 @@ where
pub async fn create_provider_factory<N, Evm>(
&self,
changeset_cache: ChangesetCache,
rocksdb_provider: Option<RocksDBProvider>,
) -> eyre::Result<ProviderFactory<N>>
where
N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
@@ -489,13 +490,18 @@ where
.with_genesis_block_number(self.chain_spec().genesis().number.unwrap_or_default())
.build()?;
// Initialize RocksDB provider with metrics, statistics, and default tables
let rocksdb_provider = RocksDBProvider::builder(self.data_dir().rocksdb())
.with_default_tables()
.with_metrics()
.with_statistics()
.build()?;
// Use the provided RocksDB provider or create a new one
let rocksdb_provider = if let Some(provider) = rocksdb_provider {
provider
} else {
RocksDBProvider::builder(self.data_dir().rocksdb())
.with_default_tables()
.with_metrics()
.with_statistics()
.build()?
};
let prune_config = self.prune_config();
let factory = ProviderFactory::new(
self.right().clone(),
self.chain_spec(),
@@ -503,7 +509,8 @@ where
rocksdb_provider,
self.task_executor().clone(),
)?
.with_prune_modes(self.prune_modes())
.with_prune_modes(prune_config.segments)
.with_minimum_pruning_distance(prune_config.minimum_pruning_distance)
.with_changeset_cache(changeset_cache);
// Check consistency between the database and static files, returning
@@ -573,12 +580,14 @@ where
pub async fn with_provider_factory<N, Evm>(
self,
changeset_cache: ChangesetCache,
rocksdb_provider: Option<RocksDBProvider>,
) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
where
N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
{
let factory = self.create_provider_factory::<N, Evm>(changeset_cache).await?;
let factory =
self.create_provider_factory::<N, Evm>(changeset_cache, rocksdb_provider).await?;
let ctx = LaunchContextWith {
inner: self.inner,
attachment: self.attachment.map_right(|_| factory),
@@ -1303,6 +1312,7 @@ mod tests {
bodies_distance: None,
receipts_log_filter: None,
bodies_before: None,
minimum_distance: None,
},
..NodeConfig::test()
};

View File

@@ -81,6 +81,7 @@ impl EngineNodeLauncher {
let Self { ctx, engine_tree_config } = self;
let NodeBuilderWithComponents {
adapter: NodeTypesAdapter { database },
rocksdb_provider,
components_builder,
add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
config,
@@ -102,7 +103,7 @@ impl EngineNodeLauncher {
// ensure certain settings take effect
.with_adjusted_configs()
// Create the provider factory with changeset cache
.with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>(changeset_cache.clone()).await?
.with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>(changeset_cache.clone(), rocksdb_provider).await?
.inspect(|_| {
info!(target: "reth::cli", "Database opened");
})

View File

@@ -93,12 +93,6 @@ min-trace-logs = ["tracing/release_max_level_trace"]
# Debug recording for sparse trie mutations
trie-debug = ["reth-engine-primitives/trie-debug"]
# Route supported tables to RocksDB instead of MDBX
rocksdb = ["reth-storage-api/rocksdb"]
# Marker feature for edge/unstable builds - enables rocksdb and sets v2 defaults
edge = ["rocksdb"]
[build-dependencies]
vergen = { workspace = true, features = ["build", "cargo", "emit_and_set"] }
vergen-git2.workspace = true

View File

@@ -815,14 +815,16 @@ impl DiscoveryArgs {
SocketAddr::V6(addr) => Some(*addr.ip()),
});
let mut discv5_config_builder =
reth_discv5::discv5::ConfigBuilder::new(ListenConfig::from_two_sockets(
discv5_addr_ipv4.map(|addr| SocketAddrV4::new(addr, *discv5_port)),
discv5_addr_ipv6.map(|addr| SocketAddrV6::new(addr, *discv5_port_ipv6, 0, 0)),
));
if discv5_addr.is_some() || discv5_addr_ipv6.is_some() || self.disable_nat {
discv5_config_builder.disable_enr_update();
}
reth_discv5::Config::builder(rlpx_tcp_socket)
.discv5_config(
reth_discv5::discv5::ConfigBuilder::new(ListenConfig::from_two_sockets(
discv5_addr_ipv4.map(|addr| SocketAddrV4::new(addr, *discv5_port)),
discv5_addr_ipv6.map(|addr| SocketAddrV6::new(addr, *discv5_port_ipv6, 0, 0)),
))
.build(),
)
.discv5_config(discv5_config_builder.build())
.add_unsigned_boot_nodes(boot_nodes)
.lookup_interval(*discv5_lookup_interval)
.bootstrap_lookup_interval(*discv5_bootstrap_lookup_interval)

View File

@@ -196,6 +196,11 @@ pub struct PruningArgs {
/// pruned.
#[arg(long = "prune.bodies.before", value_name = "BLOCK_NUMBER", conflicts_with_all = &["bodies_distance", "bodies_pre_merge"])]
pub bodies_before: Option<BlockNumber>,
/// Minimum pruning distance from the tip. This controls the safety margin for reorgs and
/// manual unwinds.
#[arg(long = "prune.minimum-distance", value_name = "BLOCKS")]
pub minimum_distance: Option<u64>,
}
impl PruningArgs {
@@ -220,7 +225,11 @@ impl PruningArgs {
.block_number()
.map(PruneMode::Before);
}
config = PruneConfig { block_interval: config.block_interval, segments }
config = PruneConfig {
block_interval: config.block_interval,
segments,
minimum_pruning_distance: config.minimum_pruning_distance,
}
}
// If --minimal is set, use minimal storage mode with aggressive pruning.
@@ -228,6 +237,7 @@ impl PruningArgs {
config = PruneConfig {
block_interval: config.block_interval,
segments: DefaultPruningValues::get_global().minimal_prune_modes.clone(),
minimum_pruning_distance: config.minimum_pruning_distance,
}
}
@@ -235,6 +245,9 @@ impl PruningArgs {
if let Some(block_interval) = self.block_interval {
config.block_interval = block_interval as usize;
}
if let Some(distance) = self.minimum_distance {
config.minimum_pruning_distance = distance;
}
if let Some(mode) = self.sender_recovery_prune_mode() {
config.segments.sender_recovery = Some(mode);
}

View File

@@ -21,7 +21,7 @@ alloy-primitives.workspace = true
alloy-consensus.workspace = true
tokio.workspace = true
tokio-tungstenite.workspace = true
tokio-tungstenite = { workspace = true, features = ["rustls-tls-native-roots"] }
futures-util.workspace = true
tokio-stream.workspace = true

View File

@@ -42,13 +42,10 @@ rayon.workspace = true
tokio.workspace = true
rustc-hash.workspace = true
[features]
rocksdb = ["reth-provider/rocksdb"]
[dev-dependencies]
# reth
reth-db = { workspace = true, features = ["test-utils"] }
reth-stages = { workspace = true, features = ["test-utils", "rocksdb"] }
reth-stages = { workspace = true, features = ["test-utils"] }
reth-primitives-traits = { workspace = true, features = ["arbitrary"] }
reth-testing-utils.workspace = true
reth-tracing.workspace = true

View File

@@ -74,7 +74,6 @@ where
let range_end = *range.end();
// Check where account history indices are stored
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storage_v2 {
return self.prune_rocksdb(provider, input, range, range_end);
}
@@ -232,7 +231,6 @@ impl AccountHistory {
///
/// Reads account changesets from static files and prunes the corresponding
/// `RocksDB` history shards.
#[cfg(all(unix, feature = "rocksdb"))]
fn prune_rocksdb<Provider>(
&self,
provider: &Provider,
@@ -506,157 +504,6 @@ mod tests {
test_prune(1400, 3, (PruneProgress::Finished, 804));
}
/// Tests the `prune_static_files` code path. On unix with rocksdb feature, v2 storage
/// routes to `prune_rocksdb` instead, so this test only runs without rocksdb (the
/// `prune_rocksdb_path` test covers that configuration).
#[test]
#[cfg(not(all(unix, feature = "rocksdb")))]
fn prune_static_file() {
let db = TestStageDB::default();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=5000,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
let (changesets, _) = random_changeset_range(
&mut rng,
blocks.iter(),
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
0..0,
0..0,
);
db.insert_changesets_to_static_files(changesets.clone(), None)
.expect("insert changesets to static files");
db.insert_history(changesets.clone(), None).expect("insert history");
let account_occurrences = db.table::<tables::AccountsHistory>().unwrap().into_iter().fold(
BTreeMap::<_, usize>::new(),
|mut map, (key, _)| {
map.entry(key.key).or_default().add_assign(1);
map
},
);
assert!(account_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
let original_shards = db.table::<tables::AccountsHistory>().unwrap();
let test_prune =
|to_block: BlockNumber, run: usize, expected_result: (PruneProgress, usize)| {
let prune_mode = PruneMode::Before(to_block);
let deleted_entries_limit = 2000;
let mut limiter =
PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
let input = PruneInput {
previous_checkpoint: db
.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::AccountHistory)
.unwrap(),
to_block,
limiter: limiter.clone(),
};
let segment = AccountHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(StorageSettings::v2());
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
assert_matches!(
result,
SegmentOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);
segment
.save_checkpoint(
&provider,
result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
)
.unwrap();
provider.commit().expect("commit");
let changesets = changesets
.iter()
.enumerate()
.flat_map(|(block_number, changeset)| {
changeset.iter().map(move |change| (block_number, change))
})
.collect::<Vec<_>>();
#[expect(clippy::skip_while_next)]
let pruned = changesets
.iter()
.enumerate()
.skip_while(|(i, (block_number, _))| {
*i < deleted_entries_limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE * run &&
*block_number <= to_block as usize
})
.next()
.map(|(i, _)| i)
.unwrap_or_default();
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
let last_pruned_block_number = pruned_changesets
.next()
.map(|(block_number, _)| {
(if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
}) as BlockNumber
})
.unwrap_or(to_block);
let actual_shards = db.table::<tables::AccountsHistory>().unwrap();
let expected_shards = original_shards
.iter()
.filter(|(key, _)| key.highest_block_number > last_pruned_block_number)
.map(|(key, blocks)| {
let new_blocks =
blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
})
.collect::<Vec<_>>();
assert_eq!(actual_shards, expected_shards);
assert_eq!(
db.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::AccountHistory)
.unwrap(),
Some(PruneCheckpoint {
block_number: Some(last_pruned_block_number),
tx_number: None,
prune_mode
})
);
};
test_prune(
998,
1,
(PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 1000),
);
test_prune(998, 2, (PruneProgress::Finished, 1000));
test_prune(1400, 3, (PruneProgress::Finished, 804));
}
#[cfg(all(unix, feature = "rocksdb"))]
#[test]
fn prune_rocksdb_path() {
use reth_db_api::models::ShardedKey;

View File

@@ -75,7 +75,6 @@ where
let range_end = *range.end();
// Check where storage history indices are stored
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storage_v2 {
return self.prune_rocksdb(provider, input, range, range_end);
}
@@ -236,7 +235,6 @@ impl StorageHistory {
///
/// Reads storage changesets from static files and prunes the corresponding
/// `RocksDB` history shards.
#[cfg(all(unix, feature = "rocksdb"))]
fn prune_rocksdb<Provider>(
&self,
provider: &Provider,
@@ -518,159 +516,6 @@ mod tests {
test_prune(1200, 3, (PruneProgress::Finished, 202));
}
/// Tests the `prune_static_files` code path. On unix with rocksdb feature, v2 storage
/// routes to `prune_rocksdb` instead, so this test only runs without rocksdb (the
/// `prune_rocksdb_path` test covers that configuration).
#[test]
#[cfg(not(all(unix, feature = "rocksdb")))]
fn prune_static_file() {
let db = TestStageDB::default();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=5000,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
let (changesets, _) = random_changeset_range(
&mut rng,
blocks.iter(),
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
1..2,
1..2,
);
db.insert_changesets_to_static_files(changesets.clone(), None)
.expect("insert changesets to static files");
db.insert_history(changesets.clone(), None).expect("insert history");
let storage_occurrences = db.table::<tables::StoragesHistory>().unwrap().into_iter().fold(
BTreeMap::<_, usize>::new(),
|mut map, (key, _)| {
map.entry((key.address, key.sharded_key.key)).or_default().add_assign(1);
map
},
);
assert!(storage_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
let original_shards = db.table::<tables::StoragesHistory>().unwrap();
let test_prune = |to_block: BlockNumber,
run: usize,
expected_result: (PruneProgress, usize)| {
let prune_mode = PruneMode::Before(to_block);
let deleted_entries_limit = 1000;
let mut limiter =
PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
let input = PruneInput {
previous_checkpoint: db
.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::StorageHistory)
.unwrap(),
to_block,
limiter: limiter.clone(),
};
let segment = StorageHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(StorageSettings::v2());
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
assert_matches!(
result,
SegmentOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);
segment
.save_checkpoint(
&provider,
result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
)
.unwrap();
provider.commit().expect("commit");
let changesets = changesets
.iter()
.enumerate()
.flat_map(|(block_number, changeset)| {
changeset.iter().flat_map(move |(address, _, entries)| {
entries.iter().map(move |entry| (block_number, address, entry))
})
})
.collect::<Vec<_>>();
#[expect(clippy::skip_while_next)]
let pruned = changesets
.iter()
.enumerate()
.skip_while(|(i, (block_number, _, _))| {
*i < deleted_entries_limit / STORAGE_HISTORY_TABLES_TO_PRUNE * run &&
*block_number <= to_block as usize
})
.next()
.map(|(i, _)| i)
.unwrap_or_default();
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
let last_pruned_block_number = pruned_changesets
.next()
.map(|(block_number, _, _)| {
(if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
}) as BlockNumber
})
.unwrap_or(to_block);
let actual_shards = db.table::<tables::StoragesHistory>().unwrap();
let expected_shards = original_shards
.iter()
.filter(|(key, _)| key.sharded_key.highest_block_number > last_pruned_block_number)
.map(|(key, blocks)| {
let new_blocks =
blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
})
.collect::<Vec<_>>();
assert_eq!(actual_shards, expected_shards);
assert_eq!(
db.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::StorageHistory)
.unwrap(),
Some(PruneCheckpoint {
block_number: Some(last_pruned_block_number),
tx_number: None,
prune_mode
})
);
};
test_prune(
998,
1,
(PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 500),
);
test_prune(998, 2, (PruneProgress::Finished, 500));
test_prune(1200, 3, (PruneProgress::Finished, 202));
}
/// Tests that when a limiter stops mid-block (with multiple storage changes for the same
/// block), the checkpoint is set to `block_number - 1` to avoid dangling index entries.
#[test]
@@ -821,7 +666,6 @@ mod tests {
assert!(final_changesets.is_empty(), "All changesets up to block 10 should be pruned");
}
#[cfg(all(unix, feature = "rocksdb"))]
#[test]
fn prune_rocksdb() {
use reth_db_api::models::storage_sharded_key::StorageShardedKey;

View File

@@ -95,7 +95,6 @@ where
.into_inner();
// Check where transaction hash numbers are stored
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storage_v2 {
return self.prune_rocksdb(provider, input, start, end);
}
@@ -196,7 +195,6 @@ impl TransactionLookup {
///
/// Reads transactions from static files and deletes corresponding entries
/// from the `RocksDB` `TransactionHashNumbers` table.
#[cfg(all(unix, feature = "rocksdb"))]
fn prune_rocksdb<Provider>(
&self,
provider: &Provider,
@@ -438,7 +436,6 @@ mod tests {
test_prune(10, (PruneProgress::Finished, 8));
}
#[cfg(all(unix, feature = "rocksdb"))]
#[test]
fn prune_rocksdb() {
use reth_db_api::models::StorageSettings;
@@ -539,7 +536,6 @@ mod tests {
/// 1. Some transactions have already been pruned (checkpoint at tx 5)
/// 2. The deleted entries limit is exhausted before any new deletions
/// 3. The checkpoint should NOT advance to the next start position
#[cfg(all(unix, feature = "rocksdb"))]
#[test]
fn prune_rocksdb_zero_deleted_checkpoint() {
use reth_db_api::models::StorageSettings;

View File

@@ -57,8 +57,8 @@ pub mod servers {
web3::Web3ApiServer,
};
pub use reth_rpc_eth_api::{
self as eth, EthApiServer, EthBundleApiServer, EthCallBundleApiServer, EthFilterApiServer,
EthPubSubApiServer, L2EthApiExtServer,
self as eth, EthApiServer, EthBundleApiServer, EthCallBundleApiServer, EthConfigApiServer,
EthFilterApiServer, EthPubSubApiServer, L2EthApiExtServer,
};
}
@@ -81,7 +81,7 @@ pub mod clients {
otterscan::OtterscanClient,
reth::RethApiClient,
reth_engine::RethEngineApiClient,
rpc::RpcApiServer,
rpc::RpcApiClient,
testing::TestingApiClient,
trace::TraceApiClient,
txpool::TxPoolApiClient,
@@ -89,7 +89,7 @@ pub mod clients {
web3::Web3ApiClient,
};
pub use reth_rpc_eth_api::{
EthApiClient, EthBundleApiClient, EthCallBundleApiClient, EthFilterApiClient,
L2EthApiExtServer,
EthApiClient, EthBundleApiClient, EthCallBundleApiClient, EthConfigApiClient,
EthFilterApiClient, L2EthApiExtClient,
};
}

View File

@@ -394,20 +394,13 @@ where
where
EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
{
let mut modules = TransportRpcModules::default();
if !module_config.is_empty() {
let TransportRpcModuleConfig { http, ws, ipc, config } = module_config.clone();
let mut registry = self.into_registry(config.unwrap_or_default(), eth, engine_events);
modules.config = module_config;
modules.http = registry.maybe_module(http.as_ref());
modules.ws = registry.maybe_module(ws.as_ref());
modules.ipc = registry.maybe_module(ipc.as_ref());
if module_config.is_empty() {
TransportRpcModules::default()
} else {
let config = module_config.config.clone().unwrap_or_default();
let mut registry = self.into_registry(config, eth, engine_events);
registry.create_transport_rpc_modules(module_config)
}
modules
}
}

View File

@@ -127,9 +127,11 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
.into());
}
let attributes = this.next_env_attributes(&parent)?;
let mut evm_env = this
.evm_config()
.next_evm_env(&parent, &this.next_env_attributes(&parent)?)
.next_evm_env(&parent, &attributes)
.map_err(RethError::other)
.map_err(Self::Error::from_eth_err)?;
@@ -205,7 +207,7 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
let ctx = this
.evm_config()
.context_for_next_block(&parent, this.next_env_attributes(&parent)?)
.context_for_next_block(&parent, attributes)
.map_err(RethError::other)
.map_err(Self::Error::from_eth_err)?;
let map_err = |e: EthApiError| -> Self::Error {
@@ -477,7 +479,11 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
.map_err(Self::Error::from_eth_err)?;
}
let mut tx_env = this.create_txn_env(&evm_env, request.clone(), &mut db)?;
// Read fields from request before consuming it in create_txn_env
let request_has_gas_limit = request.as_ref().gas_limit().is_some();
let initial = request.as_ref().access_list().cloned().unwrap_or_default();
let mut tx_env = this.create_txn_env(&evm_env, request, &mut db)?;
// we want to disable this in eth_createAccessList, since this is common practice used
// by other node impls and providers <https://github.com/foundry-rs/foundry/issues/4388>
@@ -491,21 +497,22 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
// Disabled because eth_createAccessList is sometimes used with non-eoa senders
evm_env.cfg_env.disable_eip3607 = true;
// Disable additional fee charges (e.g. L2 operator fees),
// consistent with prepare_call_env and estimate_gas_with.
evm_env.cfg_env.disable_fee_charge = true;
// Disable EIP-7825 transaction gas limit cap so that the gas limit
// fallback (block gas limit) is not rejected when it exceeds the
// per-tx cap (2^24 ≈ 16.7M post-Osaka).
evm_env.cfg_env.tx_gas_limit_cap = Some(u64::MAX);
if request.as_ref().gas_limit().is_none() && tx_env.gas_price() > 0 {
if !request_has_gas_limit && tx_env.gas_price() > 0 {
let cap = this.caller_gas_allowance(&mut db, &evm_env, &tx_env)?;
// no gas limit was provided in the request, so we need to cap the request's gas
// limit
tx_env.set_gas_limit(cap.min(evm_env.block_env.gas_limit()));
}
// can consume the list since we're not using the request anymore
let initial = request.as_ref().access_list().cloned().unwrap_or_default();
let mut inspector = AccessListInspector::new(initial);
let result = this.inspect(&mut db, evm_env.clone(), tx_env.clone(), &mut inspector)?;

View File

@@ -104,11 +104,16 @@ where
fork_timestamps.sort_unstable();
fork_timestamps.dedup();
let (current_fork_idx, current_fork_timestamp) = fork_timestamps
.iter()
.position(|ts| &latest.timestamp() < ts)
.and_then(|idx| idx.checked_sub(1))
.or_else(|| fork_timestamps.len().checked_sub(1))
let current_fork_idx = match fork_timestamps.iter().position(|ts| &latest.timestamp() < ts)
{
// All forks are in the past, use the last one.
None => fork_timestamps.len().checked_sub(1),
// First fork hasn't activated yet — no active timestamp fork.
Some(0) => None,
// Found a future fork; current is the one right before it.
Some(idx) => Some(idx - 1),
};
let (current_fork_idx, current_fork_timestamp) = current_fork_idx
.and_then(|idx| fork_timestamps.get(idx).map(|ts| (idx, *ts)))
.ok_or_else(|| RethError::msg("no active timestamp fork found"))?;

View File

@@ -42,6 +42,7 @@ pub trait EstimateCall: Call {
///
/// - `disable_eip3607` is set to `true`
/// - `disable_base_fee` is set to `true`
/// - `disable_fee_charge` is set to `true`
/// - `nonce` is set to `None`
fn estimate_gas_with<S>(
&self,
@@ -62,6 +63,10 @@ pub trait EstimateCall: Call {
// <https://github.com/ethereum/go-ethereum/blob/ee8e83fa5f6cb261dad2ed0a7bbcde4930c41e6c/internal/ethapi/api.go#L985>
evm_env.cfg_env.disable_base_fee = true;
// Disable additional fee charges (e.g. L2 operator fees) for gas estimation,
// consistent with `prepare_call_env` for `eth_call`.
evm_env.cfg_env.disable_fee_charge = true;
// set nonce to None so that the correct nonce is chosen by the EVM
request.as_mut().take_nonce();

View File

@@ -318,7 +318,8 @@ pub trait LoadPendingBlock:
// There's only limited amount of blob space available per block, so we need to
// check if the EIP-4844 can still fit in the block
if let Some(tx_blob_gas) = tx.blob_gas_used() &&
let tx_blob_gas = tx.blob_gas_used();
if let Some(tx_blob_gas) = tx_blob_gas &&
sum_blob_gas_used + tx_blob_gas > blob_params.max_blob_gas_per_block()
{
// we can't fit this _blob_ transaction into the block, so we mark it as
@@ -335,7 +336,7 @@ pub trait LoadPendingBlock:
continue
}
let gas_used = match builder.execute_transaction(tx.clone()) {
let gas_used = match builder.execute_transaction(tx) {
Ok(gas_used) => gas_used,
Err(BlockExecutionError::Validation(BlockValidationError::InvalidTx {
error,
@@ -360,7 +361,7 @@ pub trait LoadPendingBlock:
};
// add to the total blob gas used if the transaction successfully executed
if let Some(tx_blob_gas) = tx.blob_gas_used() {
if let Some(tx_blob_gas) = tx_blob_gas {
sum_blob_gas_used += tx_blob_gas;
// if we've reached the max data gas per block, we can skip blob txs entirely

View File

@@ -31,4 +31,46 @@ pub trait EthSigner<T, TxReq = TransactionRequest>: Send + Sync + DynClone {
fn sign_typed_data(&self, address: Address, payload: &TypedData) -> Result<Signature>;
}
dyn_clone::clone_trait_object!(<T> EthSigner<T>);
dyn_clone::clone_trait_object!(<T, TxReq> EthSigner<T, TxReq>);
#[cfg(test)]
mod tests {
use super::*;
#[derive(Clone)]
struct MockSigner;
struct MockSignedTx;
struct MockTxReq;
#[async_trait::async_trait]
impl EthSigner<MockSignedTx, MockTxReq> for MockSigner {
fn accounts(&self) -> Vec<Address> {
Vec::new()
}
async fn sign(&self, _address: Address, _message: &[u8]) -> Result<Signature> {
Err(SignError::NoAccount)
}
async fn sign_transaction(
&self,
_request: MockTxReq,
_address: &Address,
) -> Result<MockSignedTx> {
Err(SignError::NoAccount)
}
fn sign_typed_data(&self, _address: Address, _payload: &TypedData) -> Result<Signature> {
Err(SignError::NoAccount)
}
}
#[test]
fn clones_trait_object_with_custom_tx_request_type() {
let signer: Box<dyn EthSigner<MockSignedTx, MockTxReq>> = Box::new(MockSigner);
let cloned: Box<dyn EthSigner<MockSignedTx, MockTxReq>> = dyn_clone::clone_box(&*signer);
assert!(cloned.accounts().is_empty());
}
}

View File

@@ -25,6 +25,7 @@ pub use bundle::{EthBundleApiServer, EthCallBundleApiServer};
pub use core::{EthApiServer, FullEthApiServer};
pub use ext::L2EthApiExtServer;
pub use filter::{EngineEthFilter, EthFilterApiServer, QueryLimits};
pub use helpers::config::EthConfigApiServer;
pub use node::{RpcNodeCore, RpcNodeCoreExt};
pub use pubsub::EthPubSubApiServer;
pub use reth_rpc_convert::*;
@@ -41,5 +42,7 @@ pub use core::EthApiClient;
pub use ext::L2EthApiExtClient;
#[cfg(feature = "client")]
pub use filter::EthFilterApiClient;
#[cfg(feature = "client")]
pub use helpers::config::EthConfigApiClient;
use reth_trie_common as _;

View File

@@ -175,7 +175,8 @@ where
};
if block_values.is_empty() {
results.push(U256::from(inner.last_price.price));
// For empty blocks, use zero gas price to signal no demand
results.push(U256::ZERO);
} else {
results.extend(block_values);
populated_blocks += 1;

View File

@@ -295,6 +295,7 @@ where
fn extract_reward_traces<H: BlockHeader>(
&self,
header: &H,
block_hash: BlockHash,
ommers: Option<&[H]>,
base_block_reward: u128,
) -> Vec<LocalizedTransactionTrace> {
@@ -303,6 +304,7 @@ where
let block_reward = block_reward(base_block_reward, ommers_cnt);
traces.push(reward_trace(
block_hash,
header,
RewardAction {
author: header.beneficiary(),
@@ -316,6 +318,7 @@ where
for uncle in ommers {
let uncle_reward = ommer_reward(base_block_reward, header.number(), uncle.number());
traces.push(reward_trace(
block_hash,
header,
RewardAction {
author: uncle.beneficiary(),
@@ -428,6 +431,7 @@ where
all_traces.extend(
self.extract_reward_traces(
block.header(),
block.hash(),
block.body().ommers(),
base_block_reward,
)
@@ -502,6 +506,7 @@ where
{
traces.extend(self.extract_reward_traces(
block.header(),
block.hash(),
block.body().ommers(),
base_block_reward,
));
@@ -795,9 +800,13 @@ pub struct BlockStorageAccess {
/// Helper to construct a [`LocalizedTransactionTrace`] that describes a reward to the block
/// beneficiary.
fn reward_trace<H: BlockHeader>(header: &H, reward: RewardAction) -> LocalizedTransactionTrace {
fn reward_trace<H: BlockHeader>(
block_hash: BlockHash,
header: &H,
reward: RewardAction,
) -> LocalizedTransactionTrace {
LocalizedTransactionTrace {
block_hash: Some(header.hash_slow()),
block_hash: Some(block_hash),
block_number: Some(header.number()),
transaction_hash: None,
transaction_position: None,

View File

@@ -378,7 +378,7 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
});
}
// update finalized block if needed
// update finalized and safe block if needed
let last_saved_finalized_block_number =
provider_rw.last_finalized_block_number()?;
@@ -392,6 +392,16 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
))?;
}
let last_saved_safe_block_number = provider_rw.last_safe_block_number()?;
if last_saved_safe_block_number.is_none() ||
Some(checkpoint.block_number) < last_saved_safe_block_number
{
provider_rw.save_safe_block_number(BlockNumber::from(
checkpoint.block_number,
))?;
}
provider_rw.commit()?;
stage.post_unwind_commit()?;

View File

@@ -121,5 +121,3 @@ test-utils = [
"reth-evm-ethereum/test-utils",
"reth-tasks/test-utils",
]
rocksdb = ["reth-provider/rocksdb", "reth-db-common/rocksdb"]
edge = ["rocksdb"]

View File

@@ -1,9 +1,7 @@
use super::collect_account_history_indices;
use crate::stages::utils::{collect_history_indices, load_account_history};
use reth_config::config::{EtlConfig, IndexHistoryConfig};
#[cfg(all(unix, feature = "rocksdb"))]
use reth_db_api::Tables;
use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut};
use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut, Tables};
use reth_provider::{
DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
RocksDBProviderFactory, StorageSettingsCache,
@@ -144,7 +142,6 @@ where
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
#[cfg(all(unix, feature = "rocksdb"))]
if use_rocksdb {
provider.commit_pending_rocksdb_batches()?;
provider.rocksdb_provider().flush(&[Tables::AccountsHistory.name()])?;
@@ -663,7 +660,6 @@ mod tests {
}
}
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb_tests {
use super::*;
use reth_provider::{

View File

@@ -1,12 +1,11 @@
use super::{collect_history_indices, collect_storage_history_indices};
use crate::{stages::utils::load_storage_history, StageCheckpoint, StageId};
use reth_config::config::{EtlConfig, IndexHistoryConfig};
#[cfg(all(unix, feature = "rocksdb"))]
use reth_db_api::Tables;
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, AddressStorageKey, BlockNumberAddress},
tables,
transaction::DbTxMut,
Tables,
};
use reth_provider::{
DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
@@ -148,7 +147,6 @@ where
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
#[cfg(all(unix, feature = "rocksdb"))]
if use_rocksdb {
provider.commit_pending_rocksdb_batches()?;
provider.rocksdb_provider().flush(&[Tables::StoragesHistory.name()])?;
@@ -691,7 +689,6 @@ mod tests {
}
}
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb_tests {
use super::*;
use reth_db_api::models::StorageBeforeTx;

View File

@@ -2,12 +2,11 @@ use alloy_eips::eip2718::Encodable2718;
use alloy_primitives::{TxHash, TxNumber};
use num_traits::Zero;
use reth_config::config::{EtlConfig, TransactionLookupConfig};
#[cfg(all(unix, feature = "rocksdb"))]
use reth_db_api::Tables;
use reth_db_api::{
table::{Decode, Decompress, Value},
tables,
transaction::DbTxMut,
Tables,
};
use reth_etl::Collector;
use reth_primitives_traits::{NodePrimitives, SignedTransaction};
@@ -199,7 +198,6 @@ where
}
}
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storage_v2 {
provider.commit_pending_rocksdb_batches()?;
provider.rocksdb_provider().flush(&[Tables::TransactionHashNumbers.name()])?;
@@ -601,7 +599,6 @@ mod tests {
}
}
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb_tests {
use super::*;
use reth_provider::RocksDBProviderFactory;

View File

@@ -36,6 +36,3 @@ reth-testing-utils.workspace = true
assert_matches.workspace = true
tempfile.workspace = true
[features]
edge = ["reth-stages/edge"]

View File

@@ -93,5 +93,3 @@ op = [
"reth-codecs/op",
"reth-primitives-traits/op",
]
rocksdb = []
edge = ["rocksdb"]

View File

@@ -28,20 +28,8 @@ pub struct StorageSettings {
impl StorageSettings {
/// Returns the default base `StorageSettings`.
///
/// When the `edge` feature is enabled, returns [`Self::v2()`] so that CI and
/// edge builds automatically use v2 storage defaults. Otherwise returns
/// [`Self::v1()`]. The `--storage.v2` CLI flag can also opt into v2 at runtime
/// regardless of feature flags.
pub const fn base() -> Self {
#[cfg(feature = "edge")]
{
Self::v2()
}
#[cfg(not(feature = "edge"))]
{
Self::v1()
}
Self::v2()
}
/// Creates `StorageSettings` for v2 nodes with all storage features enabled:

View File

@@ -46,9 +46,5 @@ reth-db = { workspace = true, features = ["mdbx"] }
reth-provider = { workspace = true, features = ["test-utils"] }
reth-tasks.workspace = true
[features]
rocksdb = ["reth-db-api/rocksdb", "reth-provider/rocksdb"]
edge = ["rocksdb"]
[lints]
workspace = true

View File

@@ -10,12 +10,7 @@ use alloy_primitives::{
use reth_chainspec::EthChainSpec;
use reth_codecs::Compact;
use reth_config::config::EtlConfig;
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, ShardedKey},
tables,
transaction::DbTxMut,
BlockNumberList, DatabaseError,
};
use reth_db_api::{tables, transaction::DbTxMut, DatabaseError};
use reth_etl::Collector;
use reth_execution_errors::StateRootError;
use reth_primitives_traits::{
@@ -23,9 +18,9 @@ use reth_primitives_traits::{
};
use reth_provider::{
errors::provider::ProviderResult, providers::StaticFileWriter, BlockHashReader, BlockNumReader,
BundleStateInit, ChainSpecProvider, DBProvider, DatabaseProviderFactory, EitherWriter,
ExecutionOutcome, HashingWriter, HeaderProvider, HistoryWriter, MetadataProvider,
MetadataWriter, NodePrimitivesProvider, OriginalValuesKnown, ProviderError, RevertsInit,
BundleStateInit, ChainSpecProvider, DBProvider, DatabaseProviderFactory, ExecutionOutcome,
HashingWriter, HeaderProvider, HistoryWriter, MetadataProvider, MetadataWriter,
NodePrimitivesProvider, OriginalValuesKnown, ProviderError, RevertsInit,
RocksDBProviderFactory, StageCheckpointReader, StageCheckpointWriter, StateWriteConfig,
StateWriter, StaticFileProviderFactory, StorageSettings, StorageSettingsCache, TrieWriter,
};
@@ -46,6 +41,11 @@ use serde::{Deserialize, Serialize};
use std::io::BufRead;
use tracing::{debug, error, info, trace, warn};
pub use reth_provider::init::{
insert_account_history, insert_genesis_account_history, insert_genesis_history,
insert_genesis_storage_history, insert_history, insert_storage_history,
};
/// Default soft limit for number of bytes to read from state dump file, before inserting into
/// database.
///
@@ -415,137 +415,6 @@ where
Ok(())
}
/// Inserts history indices for genesis accounts and storage.
///
/// Writes to either MDBX or `RocksDB` based on storage settings configuration,
/// using [`EitherWriter`] to abstract over the storage backend.
pub fn insert_genesis_history<'a, 'b, Provider>(
provider: &Provider,
alloc: impl Iterator<Item = (&'a Address, &'b GenesisAccount)> + Clone,
) -> ProviderResult<()>
where
Provider: DBProvider<Tx: DbTxMut>
+ HistoryWriter
+ ChainSpecProvider
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
{
let genesis_block_number = provider.chain_spec().genesis_header().number();
insert_history(provider, alloc, genesis_block_number)
}
/// Inserts account history indices for genesis accounts.
pub fn insert_genesis_account_history<'a, 'b, Provider>(
provider: &Provider,
alloc: impl Iterator<Item = (&'a Address, &'b GenesisAccount)>,
) -> ProviderResult<()>
where
Provider: DBProvider<Tx: DbTxMut>
+ HistoryWriter
+ ChainSpecProvider
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
{
let genesis_block_number = provider.chain_spec().genesis_header().number();
insert_account_history(provider, alloc, genesis_block_number)
}
/// Inserts storage history indices for genesis accounts.
pub fn insert_genesis_storage_history<'a, 'b, Provider>(
provider: &Provider,
alloc: impl Iterator<Item = (&'a Address, &'b GenesisAccount)>,
) -> ProviderResult<()>
where
Provider: DBProvider<Tx: DbTxMut>
+ HistoryWriter
+ ChainSpecProvider
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
{
let genesis_block_number = provider.chain_spec().genesis_header().number();
insert_storage_history(provider, alloc, genesis_block_number)
}
/// Inserts history indices for genesis accounts and storage.
///
/// Writes to either MDBX or `RocksDB` based on storage settings configuration,
/// using [`EitherWriter`] to abstract over the storage backend.
pub fn insert_history<'a, 'b, Provider>(
provider: &Provider,
alloc: impl Iterator<Item = (&'a Address, &'b GenesisAccount)> + Clone,
block: u64,
) -> ProviderResult<()>
where
Provider: DBProvider<Tx: DbTxMut>
+ HistoryWriter
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
{
insert_account_history(provider, alloc.clone(), block)?;
insert_storage_history(provider, alloc, block)?;
Ok(())
}
/// Inserts account history indices at the given block.
pub fn insert_account_history<'a, 'b, Provider>(
provider: &Provider,
alloc: impl Iterator<Item = (&'a Address, &'b GenesisAccount)>,
block: u64,
) -> ProviderResult<()>
where
Provider: DBProvider<Tx: DbTxMut>
+ HistoryWriter
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
{
provider.with_rocksdb_batch(|batch| {
let mut writer = EitherWriter::new_accounts_history(provider, batch)?;
let list = BlockNumberList::new([block]).expect("single block always fits");
for (addr, _) in alloc {
writer.upsert_account_history(ShardedKey::last(*addr), &list)?;
}
trace!(target: "reth::cli", "Inserted account history");
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
Ok(())
}
/// Inserts storage history indices at the given block.
pub fn insert_storage_history<'a, 'b, Provider>(
provider: &Provider,
alloc: impl Iterator<Item = (&'a Address, &'b GenesisAccount)>,
block: u64,
) -> ProviderResult<()>
where
Provider: DBProvider<Tx: DbTxMut>
+ HistoryWriter
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
{
provider.with_rocksdb_batch(|batch| {
let mut writer = EitherWriter::new_storages_history(provider, batch)?;
let list = BlockNumberList::new([block]).expect("single block always fits");
for (addr, account) in alloc {
if let Some(storage) = &account.storage {
for key in storage.keys() {
writer.upsert_storage_history(StorageShardedKey::last(*addr, *key), &list)?;
}
}
}
trace!(target: "reth::cli", "Inserted storage history");
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
Ok(())
}
/// Inserts header for the genesis state.
pub fn insert_genesis_header<Provider, Spec>(
provider: &Provider,
@@ -633,7 +502,7 @@ where
.ok_or_else(|| eyre::eyre!("Block hash not found for block {}", block))?;
let header = provider_rw
.header_by_number(block)?
.map(SealedHeader::seal_slow)
.map(|h| SealedHeader::new(h, hash))
.ok_or_else(|| ProviderError::HeaderNotFound(block.into()))?;
let expected_state_root = header.state_root();
@@ -1051,7 +920,6 @@ mod tests {
)
};
#[cfg(feature = "rocksdb")]
{
let settings = factory.cached_storage_settings();
let rocksdb = factory.rocksdb_provider();
@@ -1079,13 +947,6 @@ mod tests {
assert_eq!(accounts, expected_accounts);
assert_eq!(storages, expected_storages);
}
#[cfg(not(feature = "rocksdb"))]
{
let (accounts, storages) = collect_from_mdbx(&factory);
assert_eq!(accounts, expected_accounts);
assert_eq!(storages, expected_storages);
}
}
#[test]

View File

@@ -36,6 +36,7 @@ reth-fs-util.workspace = true
# ethereum
alloy-eips.workspace = true
alloy-genesis.workspace = true
alloy-primitives.workspace = true
alloy-rpc-types-engine.workspace = true
alloy-consensus.workspace = true
@@ -63,9 +64,7 @@ tokio = { workspace = true, features = ["sync"], optional = true }
# parallel utils
rayon.workspace = true
[target.'cfg(unix)'.dependencies]
# rocksdb: jemalloc is recommended production workload
rocksdb = { workspace = true, features = ["jemalloc"], optional = true }
rocksdb.workspace = true
[dev-dependencies]
reth-db = { workspace = true, features = ["test-utils"] }
@@ -87,8 +86,7 @@ rand.workspace = true
tokio = { workspace = true, features = ["sync", "macros", "rt-multi-thread"] }
[features]
rocksdb = ["reth-storage-api/rocksdb", "dep:rocksdb"]
edge = ["rocksdb"]
jemalloc = ["rocksdb/jemalloc"]
test-utils = [
"reth-db/test-utils",
"reth-nippy-jar/test-utils",

View File

@@ -7,10 +7,11 @@ use std::{
ops::{Range, RangeInclusive},
};
#[cfg(all(unix, feature = "rocksdb"))]
use crate::providers::rocksdb::RocksDBBatch;
use crate::{
providers::{history_info, HistoryInfo, StaticFileProvider, StaticFileProviderRWRefMut},
providers::{
history_info, rocksdb::RocksDBBatch, HistoryInfo, StaticFileProvider,
StaticFileProviderRWRefMut,
},
StaticFileProviderFactory,
};
use alloy_primitives::{map::HashMap, Address, BlockNumber, TxHash, TxNumber, B256};
@@ -62,40 +63,16 @@ type EitherWriterTy<'a, P, T> = EitherWriter<
>;
/// Helper type for `RocksDB` batch argument in writer constructors.
///
/// When `rocksdb` feature is enabled, this is a real `RocksDB` batch.
/// Otherwise, it's `()` (unit type) to allow the same API without feature gates.
#[cfg(all(unix, feature = "rocksdb"))]
pub type RocksBatchArg<'a> = crate::providers::rocksdb::RocksDBBatch<'a>;
/// Helper type for `RocksDB` batch argument in writer constructors.
///
/// When `rocksdb` feature is enabled, this is a real `RocksDB` batch.
/// Otherwise, it's `()` (unit type) to allow the same API without feature gates.
#[cfg(not(all(unix, feature = "rocksdb")))]
pub type RocksBatchArg<'a> = ();
/// The raw `RocksDB` batch type returned by [`EitherWriter::into_raw_rocksdb_batch`].
#[cfg(all(unix, feature = "rocksdb"))]
pub type RawRocksDBBatch = rocksdb::WriteBatchWithTransaction<true>;
/// The raw `RocksDB` batch type returned by [`EitherWriter::into_raw_rocksdb_batch`].
#[cfg(not(all(unix, feature = "rocksdb")))]
pub type RawRocksDBBatch = ();
/// Helper type for `RocksDB` transaction reference argument in reader constructors.
/// Helper type for `RocksDB` snapshot argument in reader constructors.
///
/// When `rocksdb` feature is enabled, this is an optional reference to a `RocksDB` transaction.
/// The `Option` allows callers to skip transaction creation when `RocksDB` isn't needed
/// The `Option` allows callers to skip `RocksDB` access when it isn't needed
/// (e.g., on legacy MDBX-only nodes).
/// When `rocksdb` feature is disabled, it's `()` (unit type) to allow the same API without
/// feature gates.
#[cfg(all(unix, feature = "rocksdb"))]
pub type RocksTxRefArg<'a> = Option<&'a crate::providers::rocksdb::RocksTx<'a>>;
/// Helper type for `RocksDB` transaction reference argument in reader constructors.
///
/// When `rocksdb` feature is disabled, it's `()` (unit type) to allow the same API without
/// feature gates.
#[cfg(not(all(unix, feature = "rocksdb")))]
pub type RocksTxRefArg<'a> = ();
pub type RocksDBRefArg<'a> = Option<crate::providers::rocksdb::RocksReadSnapshot<'a>>;
/// Represents a destination for writing data, either to database, static files, or `RocksDB`.
#[derive(Debug, Display)]
@@ -105,7 +82,6 @@ pub enum EitherWriter<'a, CURSOR, N> {
/// Write to static file
StaticFile(StaticFileProviderRWRefMut<'a, N>),
/// Write to `RocksDB` using a write-only batch (historical tables).
#[cfg(all(unix, feature = "rocksdb"))]
RocksDB(RocksDBBatch<'a>),
}
@@ -254,7 +230,6 @@ impl<'a> EitherWriter<'a, (), ()> {
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
P::Tx: DbTxMut,
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storage_v2 {
return Ok(EitherWriter::RocksDB(_rocksdb_batch));
}
@@ -271,7 +246,6 @@ impl<'a> EitherWriter<'a, (), ()> {
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
P::Tx: DbTxMut,
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storage_v2 {
return Ok(EitherWriter::RocksDB(_rocksdb_batch));
}
@@ -290,7 +264,6 @@ impl<'a> EitherWriter<'a, (), ()> {
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
P::Tx: DbTxMut,
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storage_v2 {
return Ok(EitherWriter::RocksDB(_rocksdb_batch));
}
@@ -307,7 +280,6 @@ impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
///
/// This is used to defer `RocksDB` commits to the provider level, ensuring all
/// storage commits (MDBX, static files, `RocksDB`) happen atomically in a single place.
#[cfg(all(unix, feature = "rocksdb"))]
pub fn into_raw_rocksdb_batch(self) -> Option<rocksdb::WriteBatchWithTransaction<true>> {
match self {
Self::Database(_) | Self::StaticFile(_) => None,
@@ -315,16 +287,6 @@ impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
}
}
/// Extracts the raw `RocksDB` write batch from this writer, if it contains one.
///
/// Without the `rocksdb` feature, this always returns `None`.
#[cfg(not(all(unix, feature = "rocksdb")))]
pub fn into_raw_rocksdb_batch(self) -> Option<RawRocksDBBatch> {
match self {
Self::Database(_) | Self::StaticFile(_) => None,
}
}
/// Increment the block number.
///
/// Relevant only for [`Self::StaticFile`]. It is a no-op for [`Self::Database`].
@@ -332,7 +294,6 @@ impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
match self {
Self::Database(_) => Ok(()),
Self::StaticFile(writer) => writer.increment_block(expected_block_number),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}
@@ -347,7 +308,6 @@ impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
match self {
Self::Database(_) => Ok(()),
Self::StaticFile(writer) => writer.ensure_at_block(block_number),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}
@@ -363,7 +323,6 @@ where
match self {
Self::Database(cursor) => Ok(cursor.append(tx_num, receipt)?),
Self::StaticFile(writer) => writer.append_receipt(tx_num, receipt),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}
@@ -378,7 +337,6 @@ where
match self {
Self::Database(cursor) => Ok(cursor.append(tx_num, sender)?),
Self::StaticFile(writer) => writer.append_transaction_sender(tx_num, sender),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}
@@ -396,7 +354,6 @@ where
Ok(())
}
Self::StaticFile(writer) => writer.append_transaction_senders(senders),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}
@@ -429,7 +386,6 @@ where
writer.prune_transaction_senders(to_delete, block)?;
}
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
}
@@ -461,7 +417,6 @@ where
}
}
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.put::<tables::TransactionHashNumbers>(hash, &tx_num),
}
}
@@ -491,7 +446,6 @@ where
Ok(())
}
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => {
for (hash, tx_num) in entries {
batch.put::<tables::TransactionHashNumbers>(hash, &tx_num)?;
@@ -511,7 +465,6 @@ where
Ok(())
}
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.delete::<tables::TransactionHashNumbers>(hash),
}
}
@@ -530,7 +483,6 @@ where
match self {
Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
}
}
@@ -545,7 +497,6 @@ where
Ok(())
}
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.delete::<tables::StoragesHistory>(key),
}
}
@@ -559,7 +510,6 @@ where
match self {
Self::Database(cursor) => Ok(cursor.append(key, value)?),
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
}
}
@@ -573,7 +523,6 @@ where
match self {
Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
}
}
@@ -588,7 +537,6 @@ where
match self {
Self::Database(cursor) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.get::<tables::StoragesHistory>(key),
}
}
@@ -607,7 +555,6 @@ where
match self {
Self::Database(cursor) => Ok(cursor.append(key, value)?),
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.put::<tables::AccountsHistory>(key, value),
}
}
@@ -621,7 +568,6 @@ where
match self {
Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.put::<tables::AccountsHistory>(key, value),
}
}
@@ -636,7 +582,6 @@ where
Ok(cursor.seek_exact(ShardedKey::last(address))?.map(|(_, v)| v))
}
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.get::<tables::AccountsHistory>(ShardedKey::last(address)),
}
}
@@ -651,7 +596,6 @@ where
Ok(())
}
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.delete::<tables::AccountsHistory>(key),
}
}
@@ -680,7 +624,6 @@ where
Self::StaticFile(writer) => {
writer.append_account_changeset(changeset, block_number)?;
}
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
}
@@ -715,7 +658,6 @@ where
Self::StaticFile(writer) => {
writer.append_storage_changeset(changeset, block_number)?;
}
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
}
@@ -730,9 +672,8 @@ pub enum EitherReader<'a, CURSOR, N> {
Database(CURSOR, PhantomData<&'a ()>),
/// Read from static file
StaticFile(StaticFileProvider<N>, PhantomData<&'a ()>),
/// Read from `RocksDB` transaction
#[cfg(all(unix, feature = "rocksdb"))]
RocksDB(&'a crate::providers::rocksdb::RocksTx<'a>),
/// Read from `RocksDB` snapshot (works in both read-only and read-write modes)
RocksDB(crate::providers::rocksdb::RocksReadSnapshot<'a>),
}
impl<'a> EitherReader<'a, (), ()> {
@@ -757,16 +698,15 @@ impl<'a> EitherReader<'a, (), ()> {
/// Creates a new [`EitherReader`] for storages history based on storage settings.
pub fn new_storages_history<P>(
provider: &P,
_rocksdb_tx: RocksTxRefArg<'a>,
rocksdb: RocksDBRefArg<'a>,
) -> ProviderResult<EitherReaderTy<'a, P, tables::StoragesHistory>>
where
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
P::Tx: DbTx,
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storage_v2 {
return Ok(EitherReader::RocksDB(
_rocksdb_tx.expect("storages_history_in_rocksdb requires rocksdb tx"),
rocksdb.expect("storages_history_in_rocksdb requires rocksdb snapshot"),
));
}
@@ -779,16 +719,15 @@ impl<'a> EitherReader<'a, (), ()> {
/// Creates a new [`EitherReader`] for transaction hash numbers based on storage settings.
pub fn new_transaction_hash_numbers<P>(
provider: &P,
_rocksdb_tx: RocksTxRefArg<'a>,
rocksdb: RocksDBRefArg<'a>,
) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionHashNumbers>>
where
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
P::Tx: DbTx,
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storage_v2 {
return Ok(EitherReader::RocksDB(
_rocksdb_tx.expect("transaction_hash_numbers_in_rocksdb requires rocksdb tx"),
rocksdb.expect("transaction_hash_numbers_in_rocksdb requires rocksdb snapshot"),
));
}
@@ -801,16 +740,15 @@ impl<'a> EitherReader<'a, (), ()> {
/// Creates a new [`EitherReader`] for account history based on storage settings.
pub fn new_accounts_history<P>(
provider: &P,
_rocksdb_tx: RocksTxRefArg<'a>,
rocksdb: RocksDBRefArg<'a>,
) -> ProviderResult<EitherReaderTy<'a, P, tables::AccountsHistory>>
where
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
P::Tx: DbTx,
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storage_v2 {
return Ok(EitherReader::RocksDB(
_rocksdb_tx.expect("account_history_in_rocksdb requires rocksdb tx"),
rocksdb.expect("account_history_in_rocksdb requires rocksdb snapshot"),
));
}
@@ -865,7 +803,6 @@ where
Some(result.map(|sender| (tx_num, sender)))
})
.collect::<ProviderResult<HashMap<_, _>>>(),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}
@@ -883,8 +820,7 @@ where
match self {
Self::Database(cursor, _) => Ok(cursor.seek_exact(hash)?.map(|(_, v)| v)),
Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(tx) => tx.get::<tables::TransactionHashNumbers>(hash),
Self::RocksDB(snapshot) => snapshot.get::<tables::TransactionHashNumbers>(hash),
}
}
}
@@ -901,8 +837,7 @@ where
match self {
Self::Database(cursor, _) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(tx) => tx.get::<tables::StoragesHistory>(key),
Self::RocksDB(snapshot) => snapshot.get::<tables::StoragesHistory>(key),
}
}
@@ -926,8 +861,7 @@ where
)
}
Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(tx) => tx.storage_history_info(
Self::RocksDB(snapshot) => snapshot.storage_history_info(
address,
storage_key,
block_number,
@@ -949,8 +883,7 @@ where
match self {
Self::Database(cursor, _) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(tx) => tx.get::<tables::AccountsHistory>(key),
Self::RocksDB(snapshot) => snapshot.get::<tables::AccountsHistory>(key),
}
}
@@ -973,9 +906,8 @@ where
)
}
Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(tx) => {
tx.account_history_info(address, block_number, lowest_available_block_number)
Self::RocksDB(snapshot) => {
snapshot.account_history_info(address, block_number, lowest_available_block_number)
}
}
}
@@ -1022,7 +954,6 @@ where
entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
})
.collect(),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}
@@ -1195,7 +1126,7 @@ mod tests {
}
}
#[cfg(all(test, unix, feature = "rocksdb"))]
#[cfg(test)]
mod rocksdb_tests {
use super::*;
use crate::{
@@ -1497,7 +1428,7 @@ mod rocksdb_tests {
// Run queries against both backends using EitherReader
let mdbx_ro = factory.database_provider_ro().unwrap();
let rocks_tx = rocks_provider.tx();
let rocks_snapshot = rocks_provider.snapshot();
for (i, query) in queries.iter().enumerate() {
// MDBX query via EitherReader
@@ -1510,10 +1441,8 @@ mod rocksdb_tests {
.account_history_info(address, query.block_number, query.lowest_available)
.unwrap();
// RocksDB query via EitherReader
let mut rocks_reader: EitherReader<'_, AccountsHistoryReadCursor, EthPrimitives> =
EitherReader::RocksDB(&rocks_tx);
let rocks_result = rocks_reader
// RocksDB query via EitherReader — reuse snapshot for consistent view
let rocks_result = rocks_snapshot
.account_history_info(address, query.block_number, query.lowest_available)
.unwrap();
@@ -1546,7 +1475,6 @@ mod rocksdb_tests {
);
}
rocks_tx.rollback().unwrap();
drop(temp_dir);
}
@@ -1589,7 +1517,7 @@ mod rocksdb_tests {
// Run queries against both backends using EitherReader
let mdbx_ro = factory.database_provider_ro().unwrap();
let rocks_tx = rocks_provider.tx();
let rocks_snapshot = rocks_provider.snapshot();
for (i, query) in queries.iter().enumerate() {
// MDBX query via EitherReader
@@ -1607,10 +1535,8 @@ mod rocksdb_tests {
)
.unwrap();
// RocksDB query via EitherReader
let mut rocks_reader: EitherReader<'_, StoragesHistoryReadCursor, EthPrimitives> =
EitherReader::RocksDB(&rocks_tx);
let rocks_result = rocks_reader
// RocksDB query via snapshot — reuse for consistent view
let rocks_result = rocks_snapshot
.storage_history_info(
address,
storage_key,
@@ -1648,7 +1574,6 @@ mod rocksdb_tests {
);
}
rocks_tx.rollback().unwrap();
drop(temp_dir);
}
@@ -1878,10 +1803,10 @@ mod rocksdb_tests {
}
/// Test that `EitherReader::new_accounts_history` panics when settings require
/// `RocksDB` but no tx is provided (`None`). This is an invariant violation that
/// indicates a bug - `with_rocksdb_tx` should always provide a tx when needed.
/// `RocksDB` but no snapshot is given (`None`). This is an invariant violation that
/// indicates a bug - `with_rocksdb_snapshot` should always provide a snapshot when needed.
#[test]
#[should_panic(expected = "account_history_in_rocksdb requires rocksdb tx")]
#[should_panic(expected = "account_history_in_rocksdb requires rocksdb snapshot")]
fn test_settings_mismatch_panics() {
let factory = create_test_provider_factory();

View File

@@ -0,0 +1,145 @@
use crate::{
ChainSpecProvider, DBProvider, EitherWriter, HistoryWriter, NodePrimitivesProvider,
ProviderResult, RocksDBProviderFactory, StorageSettingsCache,
};
use alloy_consensus::BlockHeader;
use alloy_genesis::GenesisAccount;
use alloy_primitives::Address;
use reth_chainspec::EthChainSpec;
use reth_db::{
models::{storage_sharded_key::StorageShardedKey, ShardedKey},
transaction::DbTxMut,
BlockNumberList,
};
use tracing::trace;
/// Inserts history indices for genesis accounts and storage.
///
/// Writes to either MDBX or `RocksDB` based on storage settings configuration,
/// using [`EitherWriter`] to abstract over the storage backend.
pub fn insert_genesis_history<'a, 'b, Provider>(
provider: &Provider,
alloc: impl Iterator<Item = (&'a Address, &'b GenesisAccount)> + Clone,
) -> ProviderResult<()>
where
Provider: DBProvider<Tx: DbTxMut>
+ HistoryWriter
+ ChainSpecProvider
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
{
let genesis_block_number = provider.chain_spec().genesis_header().number();
insert_history(provider, alloc, genesis_block_number)
}
/// Inserts account history indices for genesis accounts.
pub fn insert_genesis_account_history<'a, 'b, Provider>(
provider: &Provider,
alloc: impl Iterator<Item = (&'a Address, &'b GenesisAccount)>,
) -> ProviderResult<()>
where
Provider: DBProvider<Tx: DbTxMut>
+ HistoryWriter
+ ChainSpecProvider
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
{
let genesis_block_number = provider.chain_spec().genesis_header().number();
insert_account_history(provider, alloc, genesis_block_number)
}
/// Inserts storage history indices for genesis accounts.
pub fn insert_genesis_storage_history<'a, 'b, Provider>(
provider: &Provider,
alloc: impl Iterator<Item = (&'a Address, &'b GenesisAccount)>,
) -> ProviderResult<()>
where
Provider: DBProvider<Tx: DbTxMut>
+ HistoryWriter
+ ChainSpecProvider
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
{
let genesis_block_number = provider.chain_spec().genesis_header().number();
insert_storage_history(provider, alloc, genesis_block_number)
}
/// Inserts history indices for genesis accounts and storage.
///
/// Writes to either MDBX or `RocksDB` based on storage settings configuration,
/// using [`EitherWriter`] to abstract over the storage backend.
pub fn insert_history<'a, 'b, Provider>(
provider: &Provider,
alloc: impl Iterator<Item = (&'a Address, &'b GenesisAccount)> + Clone,
block: u64,
) -> ProviderResult<()>
where
Provider: DBProvider<Tx: DbTxMut>
+ HistoryWriter
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
{
insert_account_history(provider, alloc.clone(), block)?;
insert_storage_history(provider, alloc, block)?;
Ok(())
}
/// Inserts account history indices at the given block.
pub fn insert_account_history<'a, 'b, Provider>(
provider: &Provider,
alloc: impl Iterator<Item = (&'a Address, &'b GenesisAccount)>,
block: u64,
) -> ProviderResult<()>
where
Provider: DBProvider<Tx: DbTxMut>
+ HistoryWriter
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
{
provider.with_rocksdb_batch(|batch| {
let mut writer = EitherWriter::new_accounts_history(provider, batch)?;
let list = BlockNumberList::new([block]).expect("single block always fits");
for (addr, _) in alloc {
writer.upsert_account_history(ShardedKey::last(*addr), &list)?;
}
trace!(target: "reth::provider", "Inserted account history");
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
Ok(())
}
/// Inserts storage history indices at the given block.
pub fn insert_storage_history<'a, 'b, Provider>(
provider: &Provider,
alloc: impl Iterator<Item = (&'a Address, &'b GenesisAccount)>,
block: u64,
) -> ProviderResult<()>
where
Provider: DBProvider<Tx: DbTxMut>
+ HistoryWriter
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
{
provider.with_rocksdb_batch(|batch| {
let mut writer = EitherWriter::new_storages_history(provider, batch)?;
let list = BlockNumberList::new([block]).expect("single block always fits");
for (addr, account) in alloc {
if let Some(storage) = &account.storage {
for key in storage.keys() {
writer.upsert_storage_history(StorageShardedKey::last(*addr, *key), &list)?;
}
}
}
trace!(target: "reth::cli", "Inserted storage history");
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
Ok(())
}

View File

@@ -12,6 +12,9 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg))]
/// Utility functions for initializing the database.
pub mod init;
/// Various provider traits.
mod traits;
pub use traits::*;

View File

@@ -182,12 +182,10 @@ impl<N: ProviderNodeTypes> RocksDBProviderFactory for BlockchainProvider<N> {
self.database.rocksdb_provider()
}
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
unimplemented!("BlockchainProvider wraps ProviderFactory - use DatabaseProvider::set_pending_rocksdb_batch instead")
}
#[cfg(all(unix, feature = "rocksdb"))]
fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
unimplemented!("BlockchainProvider wraps ProviderFactory - use DatabaseProvider::commit_pending_rocksdb_batches instead")
}
@@ -557,7 +555,6 @@ impl<N: ProviderNodeTypes> StateProviderFactory for BlockchainProvider<N> {
) -> ProviderResult<StateProviderBox> {
trace!(target: "providers::blockchain", ?block_number, "Getting history by block number");
let provider = self.consistent_provider()?;
provider.ensure_canonical_block(block_number)?;
let hash = provider
.block_hash(block_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;

View File

@@ -600,21 +600,23 @@ impl<N: ProviderNodeTypes> ConsistentProvider<N> {
self,
block_hash: BlockHash,
) -> ProviderResult<StateProviderBox> {
// Resolve block number and verify it's canonical before destructuring self
let block_number =
self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
self.ensure_canonical_block(block_number)?;
let Self { storage_provider, head_block, .. } = self;
let into_history_at_block_hash = |block_hash| -> ProviderResult<StateProviderBox> {
let block_number = storage_provider
.block_number(block_hash)?
.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
storage_provider.try_into_history_at_block(block_number)
};
if let Some(Some(block_state)) =
head_block.as_ref().map(|b| b.block_on_chain(block_hash.into()))
{
let anchor_hash = block_state.anchor().hash;
let latest_historical = into_history_at_block_hash(anchor_hash)?;
let block_number = storage_provider
.block_number(anchor_hash)?
.ok_or(ProviderError::BlockHashNotFound(anchor_hash))?;
let latest_historical = storage_provider.try_into_history_at_block(block_number)?;
return Ok(Box::new(block_state.state_provider(latest_historical)));
}
into_history_at_block_hash(block_hash)
storage_provider.try_into_history_at_block(block_number)
}
}
@@ -1287,7 +1289,9 @@ impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
Ok(match id {
BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
BlockId::Hash(hash) => self.header(hash.block_hash)?.map(SealedHeader::seal_slow),
BlockId::Hash(hash) => self
.header(hash.block_hash)?
.map(|header| SealedHeader::new(header, hash.block_hash)),
})
}

View File

@@ -90,7 +90,7 @@ pub(crate) struct DatabaseProviderMetrics {
/// Duration of `update_pipeline_stages` in `save_blocks`
save_blocks_update_pipeline_stages: Histogram,
/// Number of blocks per `save_blocks` call
save_blocks_block_count: Histogram,
save_blocks_batch_size: Histogram,
/// Duration of MDBX commit in `save_blocks`
save_blocks_commit_mdbx: Histogram,
/// Duration of static file commit in `save_blocks`
@@ -118,7 +118,7 @@ pub(crate) struct DatabaseProviderMetrics {
/// Last duration of `update_pipeline_stages` in `save_blocks`
save_blocks_update_pipeline_stages_last: Gauge,
/// Last number of blocks per `save_blocks` call
save_blocks_block_count_last: Gauge,
save_blocks_batch_size_last: Gauge,
/// Last duration of MDBX commit in `save_blocks`
save_blocks_commit_mdbx_last: Gauge,
/// Last duration of static file commit in `save_blocks`
@@ -140,7 +140,7 @@ pub(crate) struct SaveBlocksTimings {
pub write_trie_updates: Duration,
pub update_history_indices: Duration,
pub update_pipeline_stages: Duration,
pub block_count: u64,
pub batch_size: u64,
}
/// Timings collected during a `commit` call.
@@ -182,7 +182,7 @@ impl DatabaseProviderMetrics {
self.save_blocks_write_trie_updates.record(timings.write_trie_updates);
self.save_blocks_update_history_indices.record(timings.update_history_indices);
self.save_blocks_update_pipeline_stages.record(timings.update_pipeline_stages);
self.save_blocks_block_count.record(timings.block_count as f64);
self.save_blocks_batch_size.record(timings.batch_size as f64);
self.save_blocks_total_last.set(timings.total.as_secs_f64());
self.save_blocks_mdbx_last.set(timings.mdbx.as_secs_f64());
@@ -196,7 +196,7 @@ impl DatabaseProviderMetrics {
.set(timings.update_history_indices.as_secs_f64());
self.save_blocks_update_pipeline_stages_last
.set(timings.update_pipeline_stages.as_secs_f64());
self.save_blocks_block_count_last.set(timings.block_count as f64);
self.save_blocks_batch_size_last.set(timings.batch_size as f64);
}
/// Records all commit timings.

View File

@@ -24,12 +24,12 @@ use reth_node_types::{
BlockTy, HeaderTy, NodeTypesWithDB, NodeTypesWithDBAdapter, ReceiptTy, TxTy,
};
use reth_primitives_traits::{RecoveredBlock, SealedHeader};
use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment};
use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment, MINIMUM_UNWIND_SAFE_DISTANCE};
use reth_stages_types::{PipelineTarget, StageCheckpoint, StageId};
use reth_static_file_types::StaticFileSegment;
use reth_storage_api::{
BlockBodyIndicesProvider, NodePrimitivesProvider, StorageSettings, StorageSettingsCache,
TryIntoHistoricalStateProvider,
BlockBodyIndicesProvider, ChainStateBlockReader, ChainStateBlockWriter, DBProvider,
NodePrimitivesProvider, StorageSettings, StorageSettingsCache, TryIntoHistoricalStateProvider,
};
use reth_storage_errors::provider::ProviderResult;
use reth_trie::HashedPostState;
@@ -40,7 +40,7 @@ use std::{
path::Path,
sync::Arc,
};
use tracing::{instrument, trace};
use tracing::{info, instrument, trace};
mod provider;
pub use provider::{
@@ -80,6 +80,8 @@ pub struct ProviderFactory<N: NodeTypesWithDB> {
changeset_cache: ChangesetCache,
/// Task runtime for spawning parallel I/O work.
runtime: reth_tasks::Runtime,
/// Minimum distance from tip required before pruning can occur.
minimum_pruning_distance: u64,
}
impl<N: NodeTypesForProvider> ProviderFactory<NodeTypesWithDBAdapter<N, DatabaseEnv>> {
@@ -134,6 +136,7 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
rocksdb_provider,
changeset_cache: ChangesetCache::new(),
runtime,
minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
})
}
@@ -168,6 +171,15 @@ impl<N: NodeTypesWithDB> ProviderFactory<N> {
self
}
/// Sets the minimum pruning distance for an existing [`ProviderFactory`].
///
/// This controls the minimum distance from tip required before pruning can occur.
/// The default is [`MINIMUM_UNWIND_SAFE_DISTANCE`].
pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
self.minimum_pruning_distance = distance;
self
}
/// Returns reference to the underlying database.
pub const fn db_ref(&self) -> &N::DB {
&self.db
@@ -195,12 +207,10 @@ impl<N: NodeTypesWithDB> RocksDBProviderFactory for ProviderFactory<N> {
self.rocksdb_provider.clone()
}
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::set_pending_rocksdb_batch instead")
}
#[cfg(all(unix, feature = "rocksdb"))]
fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::commit_pending_rocksdb_batches instead")
}
@@ -248,7 +258,8 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
self.changeset_cache.clone(),
self.runtime.clone(),
self.db.path(),
))
)
.with_minimum_pruning_distance(self.minimum_pruning_distance))
}
/// Returns a provider with a created `DbTxMut` inside, which allows fetching and updating
@@ -257,18 +268,21 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
/// open.
#[track_caller]
pub fn provider_rw(&self) -> ProviderResult<DatabaseProviderRW<N::DB, N>> {
Ok(DatabaseProviderRW(DatabaseProvider::new_rw(
self.db.tx_mut()?,
self.chain_spec.clone(),
self.static_file_provider.clone(),
self.prune_modes.clone(),
self.storage.clone(),
self.storage_settings.clone(),
self.rocksdb_provider.clone(),
self.changeset_cache.clone(),
self.runtime.clone(),
self.db.path(),
)))
Ok(DatabaseProviderRW(
DatabaseProvider::new_rw(
self.db.tx_mut()?,
self.chain_spec.clone(),
self.static_file_provider.clone(),
self.prune_modes.clone(),
self.storage.clone(),
self.storage_settings.clone(),
self.rocksdb_provider.clone(),
self.changeset_cache.clone(),
self.runtime.clone(),
self.db.path(),
)
.with_minimum_pruning_distance(self.minimum_pruning_distance),
))
}
/// Returns a provider with a created `DbTxMut` inside, configured for unwind operations.
@@ -289,7 +303,8 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
self.changeset_cache.clone(),
self.runtime.clone(),
self.db.path(),
))
)
.with_minimum_pruning_distance(self.minimum_pruning_distance))
}
/// State provider for latest block
@@ -351,7 +366,12 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
/// consistency. I.e. this MAY result in writes to the static files.
#[instrument(err, skip(self))]
pub fn check_consistency(&self) -> ProviderResult<(Option<u64>, Option<u64>)> {
let provider_ro = self.database_provider_ro()?;
let provider_ro = self
.database_provider_ro()?
// Healing can run long-lived read transactions (e.g., iterating changesets
// over millions of blocks). Disable the default timeout so MDBX doesn't
// kill the transaction mid-heal, which causes a crash loop on startup.
.disable_long_read_transaction_safety();
// Step 1: heal file-level inconsistencies (no pruning)
self.static_file_provider().check_file_consistency(&provider_ro)?;
@@ -367,8 +387,59 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
},
);
// Step 4: Heal finalized/safe block numbers that may be ahead of the
// highest header on nodes coming from <=1.10.2.
//
// Unwinds already set it to the target block.
if rocksdb_unwind.is_none() && static_file_unwind.is_none() {
self.heal_chain_state_block_numbers(&provider_ro)?;
}
Ok((rocksdb_unwind, static_file_unwind))
}
/// If the stored finalized or safe block number is ahead of the highest
/// header, resets it to the highest header.
fn heal_chain_state_block_numbers(
&self,
provider_ro: &DatabaseProvider<<N::DB as Database>::TX, N>,
) -> ProviderResult<()> {
let highest_header = self.last_block_number()?;
let finalized = provider_ro.last_finalized_block_number()?;
let safe = provider_ro.last_safe_block_number()?;
if finalized.is_none_or(|f| f <= highest_header) && safe.is_none_or(|s| s <= highest_header)
{
return Ok(());
}
let provider_rw = self.provider_rw()?;
if let Some(finalized) = finalized.filter(|&f| f > highest_header) {
info!(
target: "providers::db",
finalized,
highest_header,
"Healing finalized block number",
);
provider_rw.save_finalized_block_number(highest_header)?;
}
if let Some(safe) = safe.filter(|&s| s > highest_header) {
info!(
target: "providers::db",
safe,
highest_header,
"Healing safe block number",
);
provider_rw.save_safe_block_number(highest_header)?;
}
provider_rw.commit()?;
Ok(())
}
}
impl<N: NodeTypesWithDB> NodePrimitivesProvider for ProviderFactory<N> {
@@ -748,6 +819,7 @@ where
rocksdb_provider,
changeset_cache,
runtime,
minimum_pruning_distance,
} = self;
f.debug_struct("ProviderFactory")
.field("db", &db)
@@ -759,6 +831,7 @@ where
.field("rocksdb_provider", &rocksdb_provider)
.field("changeset_cache", &changeset_cache)
.field("runtime", &runtime)
.field("minimum_pruning_distance", &minimum_pruning_distance)
.finish()
}
}
@@ -775,6 +848,7 @@ impl<N: NodeTypesWithDB> Clone for ProviderFactory<N> {
rocksdb_provider: self.rocksdb_provider.clone(),
changeset_cache: self.changeset_cache.clone(),
runtime: self.runtime.clone(),
minimum_pruning_distance: self.minimum_pruning_distance,
}
}
}

View File

@@ -205,7 +205,6 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
/// Path to the database directory.
db_path: PathBuf,
/// Pending `RocksDB` batches to be committed at provider commit time.
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
pending_rocksdb_batches: PendingRocksDBBatches,
/// Commit order for database operations.
commit_order: CommitOrder,
@@ -239,6 +238,12 @@ impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
pub const fn prune_modes_ref(&self) -> &PruneModes {
&self.prune_modes
}
/// Sets the minimum pruning distance.
pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
self.minimum_pruning_distance = distance;
self
}
}
impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
@@ -323,12 +328,10 @@ impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
self.rocksdb_provider.clone()
}
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
self.pending_rocksdb_batches.lock().push(batch);
}
#[cfg(all(unix, feature = "rocksdb"))]
fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
@@ -455,16 +458,11 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
where
F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
{
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb = self.rocksdb_provider();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_batch = rocksdb.batch();
#[cfg(not(all(unix, feature = "rocksdb")))]
let rocksdb_batch = ();
let (result, raw_batch) = f(rocksdb_batch)?;
#[cfg(all(unix, feature = "rocksdb"))]
if let Some(batch) = raw_batch {
self.set_pending_rocksdb_batch(batch);
}
@@ -503,7 +501,6 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
}
/// Creates the context for `RocksDB` writes.
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
fn rocksdb_write_ctx(&self, first_block: BlockNumber) -> RocksDBWriteCtx {
RocksDBWriteCtx {
first_block_number: first_block,
@@ -557,20 +554,17 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
nums
};
let mut timings = metrics::SaveBlocksTimings { block_count, ..Default::default() };
let mut timings =
metrics::SaveBlocksTimings { batch_size: block_count, ..Default::default() };
// avoid capturing &self.tx in scope below.
let sf_provider = &self.static_file_provider;
let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?;
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_provider = self.rocksdb_provider.clone();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_ctx = self.rocksdb_write_ctx(first_number);
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_enabled = rocksdb_ctx.storage_settings.storage_v2;
let mut sf_result = None;
#[cfg(all(unix, feature = "rocksdb"))]
let mut rocksdb_result = None;
// Write to all backends in parallel.
@@ -591,7 +585,6 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
});
// RocksDB writes
#[cfg(all(unix, feature = "rocksdb"))]
if rocksdb_enabled {
s.spawn(|_| {
let _guard = span.enter();
@@ -713,7 +706,6 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
// Collect results from spawned tasks
timings.sf = sf_result.ok_or(StaticFileWriterError::ThreadPanic("static file"))??;
#[cfg(all(unix, feature = "rocksdb"))]
if rocksdb_enabled {
timings.rocksdb = rocksdb_result.ok_or_else(|| {
ProviderError::Database(reth_db_api::DatabaseError::Other(
@@ -1940,8 +1932,8 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for Datab
type Transaction = TxTy<N>;
fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
self.with_rocksdb_tx(|tx_ref| {
let mut reader = EitherReader::new_transaction_hash_numbers(self, tx_ref)?;
self.with_rocksdb_snapshot(|rocksdb_ref| {
let mut reader = EitherReader::new_transaction_hash_numbers(self, rocksdb_ref)?;
reader.get_transaction_hash_number(tx_hash)
})
}
@@ -3271,11 +3263,8 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvi
last_indices.sort_unstable_by_key(|(a, _)| *a);
if self.cached_storage_settings().storage_v2 {
#[cfg(all(unix, feature = "rocksdb"))]
{
let batch = self.rocksdb_provider.unwind_account_history_indices(&last_indices)?;
self.pending_rocksdb_batches.lock().push(batch);
}
let batch = self.rocksdb_provider.unwind_account_history_indices(&last_indices)?;
self.pending_rocksdb_batches.lock().push(batch);
} else {
// Unwind the account history index in MDBX.
let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
@@ -3331,12 +3320,9 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvi
storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
if self.cached_storage_settings().storage_v2 {
#[cfg(all(unix, feature = "rocksdb"))]
{
let batch =
self.rocksdb_provider.unwind_storage_history_indices(&storage_changesets)?;
self.pending_rocksdb_batches.lock().push(batch);
}
let batch =
self.rocksdb_provider.unwind_storage_history_indices(&storage_changesets)?;
self.pending_rocksdb_batches.lock().push(batch);
} else {
// Unwind the storage history index in MDBX.
let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
@@ -3693,7 +3679,6 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
// append_*_history_shard which handles read-merge-write internally.
let storage_settings = self.cached_storage_settings();
if storage_settings.storage_v2 {
#[cfg(all(unix, feature = "rocksdb"))]
self.with_rocksdb_batch(|mut batch| {
for (address, blocks) in account_transitions {
batch.append_account_history_shard(address, blocks)?;
@@ -3704,7 +3689,6 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
self.insert_account_history_index(account_transitions)?;
}
if storage_settings.storage_v2 {
#[cfg(all(unix, feature = "rocksdb"))]
self.with_rocksdb_batch(|mut batch| {
for ((address, key), blocks) in storage_transitions {
batch.append_storage_history_shard(address, key, blocks)?;
@@ -3841,12 +3825,9 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
if self.static_file_provider.has_unwind_queued() || self.commit_order.is_unwind() {
self.tx.commit()?;
#[cfg(all(unix, feature = "rocksdb"))]
{
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
}
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
}
self.static_file_provider.commit()?;
@@ -3858,15 +3839,12 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
self.static_file_provider.finalize()?;
timings.sf = start.elapsed();
#[cfg(all(unix, feature = "rocksdb"))]
{
let start = Instant::now();
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
}
timings.rocksdb = start.elapsed();
let start = Instant::now();
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
}
timings.rocksdb = start.elapsed();
let start = Instant::now();
self.tx.commit()?;
@@ -5135,28 +5113,24 @@ mod tests {
}
}
#[cfg(all(unix, feature = "rocksdb"))]
{
let rocksdb = factory.rocksdb_provider();
for block_num in 1..=num_blocks {
for acct_idx in 0..accounts_per_block {
let address =
Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
let shards = rocksdb.account_history_shards(address).unwrap();
let rocksdb = factory.rocksdb_provider();
for block_num in 1..=num_blocks {
for acct_idx in 0..accounts_per_block {
let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
let shards = rocksdb.account_history_shards(address).unwrap();
assert!(
!shards.is_empty(),
"v2: RocksDB AccountsHistory missing for block {block_num} acct {acct_idx}"
);
for s in 1..=slots_per_account as u64 {
let slot = U256::from(s + acct_idx as u64 * 100);
let slot_key = B256::from(slot);
let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
assert!(
!shards.is_empty(),
"v2: RocksDB AccountsHistory missing for block {block_num} acct {acct_idx}"
"v2: RocksDB StoragesHistory missing for block {block_num} acct {acct_idx} slot {s}"
);
for s in 1..=slots_per_account as u64 {
let slot = U256::from(s + acct_idx as u64 * 100);
let slot_key = B256::from(slot);
let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
assert!(
!shards.is_empty(),
"v2: RocksDB StoragesHistory missing for block {block_num} acct {acct_idx} slot {s}"
);
}
}
}
}
@@ -5367,7 +5341,6 @@ mod tests {
}
#[test]
#[cfg(all(unix, feature = "rocksdb"))]
fn test_unwind_storage_history_indices_v2() {
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(StorageSettings::v2());

View File

@@ -32,15 +32,11 @@ pub use blockchain_provider::BlockchainProvider;
mod consistent;
pub use consistent::ConsistentProvider;
// RocksDB currently only supported on Unix platforms
// Windows support is planned for future releases
#[cfg_attr(all(unix, feature = "rocksdb"), path = "rocksdb/mod.rs")]
#[cfg_attr(not(all(unix, feature = "rocksdb")), path = "rocksdb_stub.rs")]
pub(crate) mod rocksdb;
pub use rocksdb::{
PruneShardOutcome, PrunedIndices, RocksDBBatch, RocksDBBuilder, RocksDBIter, RocksDBProvider,
RocksDBRawIter, RocksDBStats, RocksDBTableStats, RocksTx,
RocksDBRawIter, RocksDBStats, RocksDBTableStats, RocksReadSnapshot, RocksTx,
};
/// Helper trait to bound [`NodeTypes`] so that combined with database they satisfy

Some files were not shown because too many files have changed in this diff Show More