Compare commits

..

84 Commits

Author SHA1 Message Date
Dan Cline
3d88906c15 chore: regenerate CLI docs after rebase
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 14:54:37 -05:00
Georgios Konstantopoulos
923a7b8b55 fix: update changeset offset API and generate CLI docs
- Update split_account_changesets and split_storage_changesets to use
  the new read_changeset_offset() method instead of the removed
  changeset_offset() on SegmentHeader
- Generate CLI docs for the new static-files split command

Amp-Thread-ID: https://ampcode.com/threads/T-019c6eb5-dde6-75ed-bf8c-b520a95d9fd4
Co-authored-by: Amp <amp@ampcode.com>
2026-02-25 14:54:37 -05:00
Dan Cline
e73f30b752 chore: static file split cli command 2026-02-25 14:54:37 -05:00
Dan Cline
f84d5e6d7f chore: add Rjected as crates/cli codeowner (#22580) 2026-02-25 20:44:00 +01:00
Arsenii Kulikov
e63b6239d7 ci(bench): support configuring number of cores (#22573) 2026-02-25 17:28:35 +00:00
Matthias Seitz
660a0dee90 feat(net): persist richer peer metadata to peers file (#22557)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-25 17:03:25 +00:00
Arsenii Kulikov
f92c9b4370 perf: delay branch masks updates (#22565) 2026-02-25 15:35:12 +00:00
Brian Picciano
f0e2522294 perf: Remove unnecessary single-target storage proofs (#22539)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Arsenii Kulikov <klkvrr@gmail.com>
2026-02-25 14:35:23 +00:00
Matthias Seitz
7103088adc feat(txpool): support additional custom validation checks in EthTransactionValidator (#22559)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-25 13:32:21 +00:00
Derek Cofausper
663765af5c ci(bench): skip DM when results are posted to channel (#22563)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-25 13:18:25 +00:00
Zac Holme
20cfb2d517 fix: compute hashed post state in RpcBlockchainStateProvider (#22546)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-25 10:48:26 +00:00
Georgios Konstantopoulos
0bdf6e2f2e chore(engine): add debug log in spawned tx iterator after yielding tx index (#22558)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-25 08:49:33 +00:00
Georgios Konstantopoulos
85abd41824 perf: add thread-priority utils and boost engine/sparse-trie priority (#22541)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-25 08:33:35 +00:00
James Niken
70fb03a530 refactor(chainspec): use existing paris difficulty getter (#22474)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-02-25 05:39:12 +00:00
Georgios Konstantopoulos
96fce4dc4f chore: remove unmaintained shellexpand dependency (#22514)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-25 05:38:39 +00:00
Elaela Solis
728c7acd08 feat(exex): expose ExExManager buffer capacity in ExExLauncher (#22553) 2026-02-25 05:15:55 +00:00
stevencartavia
626c82db33 refactor(rpc): use replay_transactions_until in debug_trace_call_at_tx_index (#22542) 2026-02-25 05:00:48 +00:00
stevencartavia
624fcbd345 refactor(rpc): extract proof window validation into reusable helper (#22552) 2026-02-25 04:55:05 +00:00
Georgios Konstantopoulos
aed47bc3f8 fix(ci): add fallback for BENCH_JOB_URL in bench failure step (#22550)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-24 21:21:41 +00:00
Dan Cline
7680c1e4f6 fix: detect and remove stale CLI doc pages (#22433)
Co-authored-by: Claude Haiku 4.5 <noreply@anthropic.com>
2026-02-24 19:34:55 +00:00
Dan Cline
93cb4068d2 fix: handle payload builder stream termination gracefully (#21710) 2026-02-24 19:24:24 +00:00
Georgios Konstantopoulos
2fba05dc67 feat(rpc): add reth_forkchoiceUpdated endpoint (#22536)
Co-authored-by: Arsenii Kulikov <klkvr@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-24 18:29:38 +00:00
Alexey Shekhirin
ea143d4d31 ci(bench): report panics and error logs in comments (#22544)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 18:19:22 +00:00
Matthias Seitz
fddb7dad10 feat(net): use fork_id as tiebreaker in peer selection (#22545)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-24 18:12:25 +00:00
Arsenii Kulikov
af6d674cac perf: decrease chunk size (#22527)
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-24 17:56:39 +00:00
Georgios Konstantopoulos
de5688a76e perf(engine): remove spawn for prewarm pool init (#22543)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-24 17:35:20 +00:00
figtracer
d4cb91f0a5 perf(txpool): use BTree range queries in pending_txs_by_sender/queued_txs_by_sender (#22528)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 17:01:59 +00:00
Georgios Konstantopoulos
d122c7b49c chore(tasks): remove quanta upkeep from runtime (#22540)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-24 16:39:10 +00:00
Arsenii Kulikov
aed9014e1e chore: don't include spans for noops (#22538) 2026-02-24 16:21:38 +00:00
Arsenii Kulikov
d340114d52 refactor: don't return hashes for blinded nodes (#22535) 2026-02-24 16:08:40 +00:00
Georgios Konstantopoulos
7fc22f7b5b feat(rpc): accept RLP-encoded blocks in reth_newPayload (#22533)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Arsenii Kulikov <klkvrr@gmail.com>
2026-02-24 15:46:02 +00:00
Georgios Konstantopoulos
c8c5f8886d perf(engine): use rayon par_iter for tx prewarming instead of manual workers (#22521)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
2026-02-24 15:42:06 +00:00
Arsenii Kulikov
2f3c8d7d03 feat(bench): enable --log.samply when samply is configred (#22526) 2026-02-24 13:03:19 +00:00
Georgios Konstantopoulos
a90f8be67b revert: "perf(trie): replace Box clone with unsafe reborrow in prune (#22516)" (#22525) 2026-02-24 12:14:22 +00:00
Georgios Konstantopoulos
7faca05344 refactor(engine): use spawn_blocking_named for tx_iterator thread (#22522)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-24 11:07:19 +00:00
Matthias Seitz
2827b0aca0 refactor: simplify uncle block fetching in RPC (#22523)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-24 12:11:33 +01:00
Georgios Konstantopoulos
d3bb2faf28 refactor(rpc): extract RethEngineApi into standalone struct (#22504)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Alexey Shekhirin <github@shekhirin.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 10:57:13 +00:00
Arsenii Kulikov
ef292ffa00 fix: don't produce both updates and removals for trie nodes (#22507)
Co-authored-by: Brian Picciano <me@mediocregopher.com>
2026-02-24 10:36:34 +00:00
Georgios Konstantopoulos
ea98d37bb3 ci: use native ARM runner for aarch64 linux release builds (#22519)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-24 09:16:38 +00:00
Emma Jamieson-Hoare
f2b3201187 chore(release-builds): remove the riscv builds from release pipeline (#22499) 2026-02-24 09:00:48 +00:00
Georgios Konstantopoulos
d1cbf6ca5a perf(trie): reserve capacity in apply_subtrie_update_actions (#22517)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-24 08:38:09 +00:00
Georgios Konstantopoulos
56bb47709c perf(trie): replace Box clone with unsafe reborrow in prune (#22516)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-24 08:22:52 +00:00
Elaela Solis
3703255d5d fix: make SerdeBincodeCompat generic for EthereumTxEnvelope (#22513) 2026-02-24 08:11:15 +00:00
DaniPopes
b431caf806 fix: avoid duplicate runtime initialization on startup (#22515) 2026-02-24 07:49:48 +00:00
Matthias Seitz
21dadb71c3 fix: update shellexpand to 3.1.2 and unpin nightly (#22506)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-23 21:42:58 +01:00
Arsenii Kulikov
98c45a4245 fix: remove debug_asssert! (#22505) 2026-02-23 20:13:51 +00:00
Arsenii Kulikov
ac2cc7b4e2 fix: proper SerdeBincodeCompat for EthereumReceipt (#22461) 2026-02-23 19:31:24 +00:00
Arsenii Kulikov
3931affcf2 revert: feat(rpc): move reth_newPayload from EngineApi to RethApi (#22500) 2026-02-23 18:33:59 +00:00
Alexey Shekhirin
93b7ae9286 chore(storage): propagate span context across rayon thread boundaries (#22497)
Co-authored-by: Claude Haiku 4.5 <noreply@anthropic.com>
2026-02-23 18:17:18 +00:00
Emma Jamieson-Hoare
7e7717bdaa chore: release 1.11.1 (#22496)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-23 18:07:50 +00:00
Georgios Konstantopoulos
815037e27d feat(storage): slot preimage DB for plain changeset keys in v2 (#22379)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 18:01:44 +00:00
Georgios Konstantopoulos
80bf5532ac perf(trie): pack StoredNibblesSubKey from 65→33 bytes, generic cursor factory (#22158)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
2026-02-23 17:02:43 +00:00
Arsenii Kulikov
028e99191a perf: optimize sparse trie (#22418)
Co-authored-by: Alexey Shekhirin <github@shekhirin.com>
Co-authored-by: Brian Picciano <me@mediocregopher.com>
2026-02-23 16:18:45 +00:00
Georgios Konstantopoulos
dc35fc8251 feat(rpc): move reth_newPayload from EngineApi to RethApi (#22425)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Arsenii Kulikov <klkvrr@gmail.com>
2026-02-23 15:43:20 +00:00
Georgios Konstantopoulos
285c325d71 feat(re-execute): work-stealing parallelization (#22242)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Arsenii Kulikov <klkvrr@gmail.com>
2026-02-23 15:39:24 +00:00
Arsenii Kulikov
ca47a7e9f9 fix: overlay preparation on tokio (#22492) 2026-02-23 15:37:55 +00:00
MergeBot
6d718d0c21 fix(rpc): use actual configured limit in trace_filter (#22477) 2026-02-23 13:14:19 +00:00
YK
949111c953 perf(engine): precompute tx root during payload validation (#22489) 2026-02-23 10:35:22 +00:00
Georgios Konstantopoulos
742eb56949 perf(engine): add tracing spans for post-execution validation wait times (#22483)
Co-authored-by: Yong Kang <yongkangc@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: YK <chiayongkang@hotmail.com>
2026-02-23 09:56:09 +00:00
Matthias Seitz
4af4836ec1 ci: pin nightly to 2026-02-21 (#22485)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-23 10:26:08 +01:00
figtracer
3bc71e7ec0 chore: use ValidPoolTransaction methods instead of reaching into inner field (#22475)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 18:15:49 +01:00
VolodymyrBg
03fbb6cafe fix(rpc): stop IPC handle in AuthServerHandle::stop() (#22467) 2026-02-22 07:56:52 +01:00
Alexey Shekhirin
b09b097a0b chore(ci): enhance benchmark artifact collection (#22457)
Co-authored-by: Claude Haiku 4.5 <noreply@anthropic.com>
2026-02-21 13:43:59 +00:00
MergeBot
0fffdcdd23 fix(tracing): handle file_writer in LogFmt format (#22429) 2026-02-21 09:12:32 +00:00
strmfos
bc33eb764a fix(txpool): prevent underflow in blobstore versioned hash lookup (#22454)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Amp <amp@ampcode.com>
2026-02-21 08:54:09 +00:00
Georgios Konstantopoulos
190157636e chore: remove unused Default impl for ExecutionEnv (#22451)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-21 08:29:59 +00:00
figtracer
8e3bc6567c chore(txpool): use to_consensus helper instead of reaching into inner field (#22426)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 08:28:57 +00:00
Georgios Konstantopoulos
45b961c7b3 chore: deprecate reth-primitives crate (#22450)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-21 08:16:34 +00:00
stevencartavia
94818d7676 feat(rpc): add reth_getBlockExecutionOutcome endpoint (#22397)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-02-21 05:47:05 +00:00
Alexey Shekhirin
4c2a9a9b4a feat(bench): add Slack notifications with Block Kit (#22447)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 00:09:07 +00:00
Alexey Shekhirin
76c37f0f80 ci(bench): install all runner dependencies from job (#22445)
Co-authored-by: Claude Haiku 4.5 <noreply@anthropic.com>
2026-02-20 21:54:45 +00:00
figtracer
0275ff35fd refactor(net): add methods to PropagatedTransactions instead of exposing .0 (#22441)
Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-02-20 20:51:20 +00:00
Alexey Shekhirin
3f011c8328 ci(bench): add median lines to benchmark charts (#22439)
Co-authored-by: Claude Haiku 4.5 <noreply@anthropic.com>
2026-02-20 20:40:32 +00:00
figtracer
beac28dbb2 chore(payload): use Transaction::blob_versioned_hashes() directly (#22440)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-20 18:59:06 +00:00
Alexey Shekhirin
bce100c6c8 ci(bench): add samply profiling support (#22432)
Co-authored-by: Claude Haiku 4.5 <noreply@anthropic.com>
2026-02-20 18:16:28 +00:00
Alexey Shekhirin
40e99a4a4f ci(bench): switch to @decofe bot and new secret names (#22434)
Co-authored-by: Claude Haiku 4.5 <noreply@anthropic.com>
2026-02-20 17:55:16 +00:00
Dan Cline
1ff88e43cd fix: handle missing rocksdb gracefully in read-only db commands (#22394)
Co-authored-by: Claude Haiku 4.5 <noreply@anthropic.com>
2026-02-20 17:18:43 +00:00
joshieDo
d23c244cd1 fix: align static-file changeset checksum with MDBX semantics (#22389)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-20 17:11:08 +00:00
Dan Cline
3de9259026 docs: add CLI docs regeneration guide (#22395)
Co-authored-by: Claude Haiku 4.5 <noreply@anthropic.com>
2026-02-20 16:46:18 +00:00
Alexey Shekhirin
d24f0b1e05 fix: update PR comment when bench workflow is cancelled (#22430)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-20 15:20:10 +00:00
iPLAY888
bb1b9ec611 fix(net): log transaction count instead of bool in broadcast debug log (#22417) 2026-02-20 12:15:40 +00:00
Arsenii Kulikov
70cab0d163 fix: properly reveal trie nodes (#22415) 2026-02-20 11:51:04 +00:00
Alexey Shekhirin
e530b1f6a1 refactor(bench): push charts to external repo instead of bench-charts branch (#22414)
Co-authored-by: Alexey Shekhirin <shekhirin@shekhirin-tempo.tail388b2e.ts.net>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-20 11:42:15 +00:00
Emma Jamieson-Hoare
ff5d375526 docs(hive): add comments explaining why flaky tests are ignored (#22383)
Co-authored-by: Amp <amp@ampcode.com>
2026-02-20 11:18:20 +00:00
207 changed files with 8111 additions and 3743 deletions

View File

@@ -0,0 +1,5 @@
---
reth-transaction-pool: minor
---
Added support for optional custom stateless and stateful validation hooks in `EthTransactionValidator` via `set_additional_stateless_validation` and `set_additional_stateful_validation` methods. Also implemented a manual `Debug` impl to handle the non-`Debug` function pointer fields.

View File

@@ -0,0 +1,7 @@
---
reth-network-types: minor
reth-network: minor
reth-node-core: patch
---
Added `PersistedPeerInfo` struct to persist richer peer metadata (kind, fork ID, reputation) to disk. Updated `PeersConfig::with_basic_nodes_from_file` to support both the new `PersistedPeerInfo` format and the legacy `Vec<NodeRecord>` format with automatic conversion, and updated `write_peers_to_file` to exclude backed-off and banned peers.

View File

@@ -0,0 +1,5 @@
---
reth-network: minor
---
Added `fork_id` as a tiebreaker in peer selection when reputations are equal, preferring peers with a discovered `fork_id` as it indicates fork compatibility. Added a test to verify the tiebreaker behavior.

View File

@@ -0,0 +1,5 @@
---
reth-trie-sparse: patch
---
Fixed a bug where trie nodes could appear in both `updated_nodes` and `removed_nodes` simultaneously by removing entries from `removed_nodes` when a node is inserted as updated.

2
.github/CODEOWNERS vendored
View File

@@ -1,7 +1,7 @@
* @gakonst
crates/chain-state/ @fgimenez @mattsse
crates/chainspec/ @Rjected @joshieDo @mattsse
crates/cli/ @mattsse
crates/cli/ @mattsse @Rjected
crates/config/ @shekhirin @mattsse @Rjected
crates/consensus/ @mattsse @Rjected
crates/e2e-test-utils/ @mattsse @Rjected @klkvr @fgimenez

View File

@@ -14,10 +14,10 @@
# baseline: <source-dir>/target/profiling/reth
# feature: <source-dir>/target/profiling/reth, reth-bench installed to cargo bin
#
# Required: mc (MinIO client) configured at /home/ubuntu/.mc
# Required: mc (MinIO client) with a configured alias
set -euo pipefail
MC="mc --config-dir /home/ubuntu/.mc"
MC="mc"
MODE="$1"
SOURCE_DIR="$2"
COMMIT="$3"

View File

@@ -73,22 +73,24 @@ def plot_latency_and_throughput(
for r in baseline:
lat_s = r["new_payload_latency_us"] / 1_000_000
base_ggas.append(r["gas_used"] / lat_s / GIGAGAS if lat_s > 0 else 0)
ax1.plot(base_x, base_lat, linewidth=0.8, label=baseline_name, alpha=0.7)
ax2.plot(base_x, base_ggas, linewidth=0.8, label=baseline_name, alpha=0.7)
l, = ax1.plot(base_x, base_lat, linewidth=0.8, label=baseline_name, alpha=0.7)
ax1.axhline(np.median(base_lat), color=l.get_color(), linestyle="--", linewidth=1, alpha=0.7, label=f"{baseline_name} median")
l, = ax2.plot(base_x, base_ggas, linewidth=0.8, label=baseline_name, alpha=0.7)
ax2.axhline(np.median(base_ggas), color=l.get_color(), linestyle="--", linewidth=1, alpha=0.7, label=f"{baseline_name} median")
ax1.plot(feat_x, feat_lat, linewidth=0.8, label=feature_name)
l, = ax1.plot(feat_x, feat_lat, linewidth=0.8, label=feature_name)
ax1.axhline(np.median(feat_lat), color=l.get_color(), linestyle="--", linewidth=1, label=f"{feature_name} median")
ax1.set_ylabel("Latency (ms)")
ax1.set_title("newPayload Latency per Block")
ax1.grid(True, alpha=0.3)
if baseline:
ax1.legend()
ax1.legend()
ax2.plot(feat_x, feat_ggas, linewidth=0.8, label=feature_name)
l, = ax2.plot(feat_x, feat_ggas, linewidth=0.8, label=feature_name)
ax2.axhline(np.median(feat_ggas), color=l.get_color(), linestyle="--", linewidth=1, label=f"{feature_name} median")
ax2.set_ylabel("Ggas/s")
ax2.set_title("Execution Throughput per Block")
ax2.grid(True, alpha=0.3)
if baseline:
ax2.legend()
ax2.legend()
if baseline:
ax3 = axes[2]

View File

@@ -18,14 +18,35 @@ LOG="${OUTPUT_DIR}/node.log"
cleanup() {
kill "$TAIL_PID" 2>/dev/null || true
if [ -n "${RETH_PID:-}" ] && sudo kill -0 "$RETH_PID" 2>/dev/null; then
sudo kill "$RETH_PID"
for i in $(seq 1 30); do
sudo kill -0 "$RETH_PID" 2>/dev/null || break
sleep 1
done
if [ "${BENCH_SAMPLY:-false}" = "true" ]; then
# Send SIGINT to the inner reth process by exact name (not -f which
# would also match samply's cmdline containing "reth"). Samply will
# capture reth's exit and save the profile.
sudo pkill -INT -x reth 2>/dev/null || true
# Wait for samply to finish writing the profile and exit
for i in $(seq 1 120); do
sudo pgrep -x samply > /dev/null 2>&1 || break
if [ $((i % 10)) -eq 0 ]; then
echo "Waiting for samply to finish writing profile... (${i}s)"
fi
sleep 1
done
if sudo pgrep -x samply > /dev/null 2>&1; then
echo "Samply still running after 120s, sending SIGTERM..."
sudo pkill -x samply 2>/dev/null || true
fi
else
sudo kill "$RETH_PID"
for i in $(seq 1 30); do
sudo kill -0 "$RETH_PID" 2>/dev/null || break
sleep 1
done
fi
sudo kill -9 "$RETH_PID" 2>/dev/null || true
sleep 1
fi
# Fix ownership of reth-created files (reth runs as root)
sudo chown -R "$(id -un):$(id -gn)" "$OUTPUT_DIR" 2>/dev/null || true
if mountpoint -q "$SCHELK_MOUNT"; then
sudo umount -l "$SCHELK_MOUNT" || true
sudo schelk recover -y || true
@@ -46,18 +67,38 @@ grep Cached /proc/meminfo
# CPU layout: core 0 = OS/IRQs/reth-bench/aux, cores 1+ = reth node
RETH_BENCH="$(which reth-bench)"
ONLINE=$(nproc --all)
RETH_CPUS="1-$(( ONLINE - 1 ))"
sudo taskset -c "$RETH_CPUS" nice -n -20 "$BINARY" node \
--datadir "$DATADIR" \
--engine.accept-execution-requests-hash \
--http \
--http.port 8545 \
--ws \
--ws.api all \
--authrpc.port 8551 \
--disable-discovery \
--no-persist-peers \
> "$LOG" 2>&1 &
MAX_RETH=$(( ONLINE - 1 ))
if [ "${BENCH_CORES:-0}" -gt 0 ] && [ "$BENCH_CORES" -lt "$MAX_RETH" ]; then
MAX_RETH=$BENCH_CORES
fi
RETH_CPUS="1-${MAX_RETH}"
RETH_ARGS=(
node
--datadir "$DATADIR"
--log.file.directory "$OUTPUT_DIR/reth-logs"
--engine.accept-execution-requests-hash
--http
--http.port 8545
--ws
--ws.api all
--authrpc.port 8551
--disable-discovery
--no-persist-peers
)
if [ "${BENCH_SAMPLY:-false}" = "true" ]; then
RETH_ARGS+=(--log.samply)
SAMPLY="$(which samply)"
sudo taskset -c "$RETH_CPUS" nice -n -20 \
"$SAMPLY" record --save-only --presymbolicate --rate 10000 \
--output "$OUTPUT_DIR/samply-profile.json.gz" \
-- "$BINARY" "${RETH_ARGS[@]}" \
> "$LOG" 2>&1 &
else
sudo taskset -c "$RETH_CPUS" nice -n -20 "$BINARY" "${RETH_ARGS[@]}" \
> "$LOG" 2>&1 &
fi
RETH_PID=$!
stdbuf -oL tail -f "$LOG" | sed -u "s/^/[reth] /" &

View File

@@ -243,13 +243,6 @@ def compute_paired_stats(
}
def compute_summary(combined: list[dict], gas: list[dict]) -> dict:
"""Compute aggregate metrics from parsed CSV data."""
blocks = len(combined)
return {
"blocks": blocks,
}
def format_duration(seconds: float) -> str:
if seconds >= 60:
@@ -274,22 +267,56 @@ def fmt_mgas(v: float) -> str:
return f"{v:.2f}"
def significance(pct: float, ci_pct: float, lower_is_better: bool) -> str:
"""Return significance label: 'good', 'bad', or 'neutral'."""
significant = abs(pct) > ci_pct
if not significant:
return "neutral"
elif (pct < 0) == lower_is_better:
return "good"
else:
return "bad"
def change_str(pct: float, ci_pct: float, lower_is_better: bool) -> str:
"""Format change% with paired CI significance.
Significant if the CI doesn't cross zero (i.e. |pct| > ci_pct).
"""
significant = abs(pct) > ci_pct
if not significant:
emoji = ""
elif (pct < 0) == lower_is_better:
emoji = ""
else:
emoji = ""
sig = significance(pct, ci_pct, lower_is_better)
emoji = {"good": "", "bad": "", "neutral": ""}[sig]
return f"{pct:+.2f}% {emoji}{ci_pct:.2f}%)"
def compute_changes(
baseline_stats: dict, feature_stats: dict, paired_stats: dict
) -> dict:
"""Pre-compute change percentages and significance for each metric."""
def pct(base: float, feat: float) -> float:
return (feat - base) / base * 100.0 if base > 0 else 0.0
def ci_pct(ci_ms: float, base_ms: float) -> float:
return ci_ms / base_ms * 100.0 if base_ms > 0 else 0.0
metrics = [
("mean", "mean_ms", "ci_ms", "mean_ms", True),
("p50", "p50_ms", "p50_ci_ms", "p50_ms", True),
("p90", "p90_ms", "p90_ci_ms", "p90_ms", True),
("p99", "p99_ms", "p99_ci_ms", "p99_ms", True),
("mgas_s", "mean_mgas_s", "mgas_ci", "mean_mgas_s", False),
]
changes = {}
for name, stat_key, ci_key, base_key, lower_is_better in metrics:
p = pct(baseline_stats[stat_key], feature_stats[stat_key])
c = ci_pct(paired_stats[ci_key], baseline_stats[base_key])
changes[name] = {
"pct": round(p, 4),
"ci_pct": round(c, 4),
"sig": significance(p, c, lower_is_better),
}
return changes
def generate_comparison_table(
run1: dict,
run2: dict,
@@ -438,11 +465,6 @@ def main():
all_baseline = [r for run in baseline_runs for r in run]
all_feature = [r for run in feature_runs for r in run]
summary = compute_summary(all_feature, gas)
with open(args.output_summary, "w") as f:
json.dump(summary, f, indent=2)
print(f"Summary written to {args.output_summary}")
baseline_stats = compute_stats(all_baseline)
feature_stats = compute_stats(all_feature)
paired_stats = compute_paired_stats(baseline_runs, feature_runs)
@@ -479,13 +501,40 @@ def main():
("execution_cache_wait_us", "Execution Cache Update Wait"),
]
wait_time_tables = []
wait_time_data = {}
for field, title in wait_fields:
b_stats = compute_wait_stats(all_baseline, field)
f_stats = compute_wait_stats(all_feature, field)
if b_stats and f_stats:
wait_time_data[field] = {
"title": title,
"baseline": b_stats,
"feature": f_stats,
}
table = generate_wait_time_table(title, b_stats, f_stats, baseline_label, feature_label)
if table:
wait_time_tables.append(table)
summary = {
"blocks": paired_stats["blocks"],
"baseline": {
"name": baseline_name,
"ref": baseline_ref,
"stats": baseline_stats,
},
"feature": {
"name": feature_name,
"ref": feature_sha,
"stats": feature_stats,
},
"paired": paired_stats,
"changes": compute_changes(baseline_stats, feature_stats, paired_stats),
"wait_times": wait_time_data,
}
with open(args.output_summary, "w") as f:
json.dump(summary, f, indent=2)
print(f"Summary written to {args.output_summary}")
markdown = generate_markdown(
summary, comparison_table,
wait_time_tables=wait_time_tables,

339
.github/scripts/bench-slack-notify.js vendored Normal file
View File

@@ -0,0 +1,339 @@
// Sends Slack notifications for reth-bench results.
//
// Reads from environment:
// SLACK_BENCH_BOT_TOKEN Slack Bot User OAuth Token (xoxb-...)
// SLACK_BENCH_CHANNEL Public channel ID for significant improvements
// BENCH_WORK_DIR Directory containing summary.json
// BENCH_PR PR number (may be empty)
// BENCH_ACTOR GitHub user who triggered the bench
// BENCH_JOB_URL URL to the Actions job page
// BENCH_SAMPLY 'true' if samply profiling was enabled
//
// Usage from actions/github-script:
// const notify = require('./.github/scripts/bench-slack-notify.js');
// await notify.success({ core, context });
// await notify.failure({ core, context, failedStep: '...' });
const fs = require('fs');
const path = require('path');
const SLACK_API = 'https://slack.com/api/chat.postMessage';
function loadSlackUsers(repoRoot) {
try {
const raw = fs.readFileSync(path.join(repoRoot, '.github', 'scripts', 'bench-slack-users.json'), 'utf8');
const data = JSON.parse(raw);
// Filter out non-user-ID entries (like _comment)
const users = {};
for (const [k, v] of Object.entries(data)) {
if (!k.startsWith('_') && typeof v === 'string' && v.startsWith('U')) {
users[k] = v;
}
}
return users;
} catch {
return {};
}
}
async function postToSlack(token, channel, blocks, text, core, threadTs) {
const payload = { channel, blocks, text, unfurl_links: false };
if (threadTs) payload.thread_ts = threadTs;
const resp = await fetch(SLACK_API, {
method: 'POST',
headers: {
'Authorization': `Bearer ${token}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(payload),
});
const data = await resp.json();
if (!data.ok) {
core.warning(`Slack API error (channel ${channel}): ${JSON.stringify(data)}`);
}
return data;
}
function cell(text) {
const s = String(text);
return { type: 'raw_text', text: s || ' ' };
}
function buildSuccessBlocks({ summary, prNumber, actor, actorSlackId, jobUrl, repo, samplyUrls }) {
const b = summary.baseline.stats;
const f = summary.feature.stats;
const c = summary.changes;
const sigEmoji = { good: '\u2705', bad: '\u274c', neutral: '\u26aa' };
function fmtMs(v) { return v.toFixed(2) + 'ms'; }
function fmtMgas(v) { return v.toFixed(2); }
function fmtChange(ch) {
if (!ch.pct && !ch.ci_pct) return ' ';
const pctStr = `${ch.pct >= 0 ? '+' : ''}${ch.pct.toFixed(2)}%`;
const ciStr = ch.ci_pct ? ` (\u00b1${ch.ci_pct.toFixed(2)}%)` : '';
return `${pctStr}${ciStr} ${sigEmoji[ch.sig]}`;
}
// Overall result for header
const vals = Object.values(c);
const hasBad = vals.some(v => v.sig === 'bad');
const hasGood = vals.some(v => v.sig === 'good');
let headerEmoji, headerResult;
if (hasBad && hasGood) {
headerEmoji = ':warning:';
headerResult = 'Mixed Results';
} else if (hasBad) {
headerEmoji = ':x:';
headerResult = 'Regression';
} else if (hasGood) {
headerEmoji = ':white_check_mark:';
headerResult = 'Improvement';
} else {
headerEmoji = ':white_circle:';
headerResult = 'No Difference';
}
const prUrl = prNumber ? `https://github.com/${repo}/pull/${prNumber}` : '';
const commitUrl = `https://github.com/${repo}/commit`;
const baselineLink = `<${commitUrl}/${summary.baseline.ref}|${summary.baseline.name}>`;
const featureLink = `<${commitUrl}/${summary.feature.ref}|${summary.feature.name}>`;
// Meta line
const metaParts = [];
if (prNumber) metaParts.push(`*<${prUrl}|PR #${prNumber}>*`);
metaParts.push(`triggered by ${actorSlackId ? `<@${actorSlackId}>` : `@${actor}`}`);
// Baseline/feature lines with samply profile links
let baselineLine = `*Baseline:* ${baselineLink}`;
const bl1 = samplyUrls['baseline-1'];
const bl2 = samplyUrls['baseline-2'];
if (bl1) baselineLine += ` | <${bl1}|Samply 1>`;
if (bl2) baselineLine += ` | <${bl2}|Samply 2>`;
let featureLine = `*Feature:* ${featureLink}`;
const fl1 = samplyUrls['feature-1'];
const fl2 = samplyUrls['feature-2'];
if (fl1) featureLine += ` | <${fl1}|Samply 1>`;
if (fl2) featureLine += ` | <${fl2}|Samply 2>`;
const warmup = summary.warmup_blocks || process.env.BENCH_WARMUP_BLOCKS || '';
const countsLine = warmup
? `*Warmup:* ${warmup} | *Blocks:* ${summary.blocks}`
: `*Blocks:* ${summary.blocks}`;
const sectionText = [metaParts.join(' | '), '', baselineLine, featureLine, countsLine].join('\n');
// Action buttons
const diffUrl = `https://github.com/${repo}/compare/${summary.baseline.ref}...${summary.feature.ref}`;
const buttons = [
{
type: 'button',
text: { type: 'plain_text', text: 'CI :github:', emoji: true },
url: jobUrl,
action_id: 'ci_button',
},
{
type: 'button',
text: { type: 'plain_text', text: 'Diff :github:', emoji: true },
url: diffUrl,
action_id: 'diff_button',
},
];
const blocks = [
{
type: 'header',
text: { type: 'plain_text', text: `${headerEmoji} ${headerResult}`, emoji: true },
},
{
type: 'section',
text: { type: 'mrkdwn', text: sectionText },
},
{
type: 'table',
column_settings: [
{ align: 'left' },
{ align: 'right' },
{ align: 'right' },
{ align: 'right' },
],
rows: [
[cell('Metric'), cell('Baseline'), cell('Feature'), cell('Change')],
[cell('Mean'), cell(fmtMs(b.mean_ms)), cell(fmtMs(f.mean_ms)), cell(fmtChange(c.mean))],
[cell('StdDev'), cell(fmtMs(b.stddev_ms)), cell(fmtMs(f.stddev_ms)), cell(' ')],
[cell('P50'), cell(fmtMs(b.p50_ms)), cell(fmtMs(f.p50_ms)), cell(fmtChange(c.p50))],
[cell('P90'), cell(fmtMs(b.p90_ms)), cell(fmtMs(f.p90_ms)), cell(fmtChange(c.p90))],
[cell('P99'), cell(fmtMs(b.p99_ms)), cell(fmtMs(f.p99_ms)), cell(fmtChange(c.p99))],
[cell('Mgas/s'), cell(fmtMgas(b.mean_mgas_s)), cell(fmtMgas(f.mean_mgas_s)), cell(fmtChange(c.mgas_s))],
],
},
{
type: 'actions',
elements: buttons,
},
];
// Wait times as a separate table block (sent as threaded reply due to Slack one-table limit)
const threadBlocks = [];
const waitTimes = summary.wait_times || {};
const waitKeys = Object.keys(waitTimes);
if (waitKeys.length > 0) {
const waitRows = [
[cell('Wait Time'), cell('Baseline'), cell('Feature')],
];
for (const key of waitKeys) {
const wt = waitTimes[key];
waitRows.push([cell(wt.title), cell(fmtMs(wt.baseline.mean_ms)), cell(fmtMs(wt.feature.mean_ms))]);
}
threadBlocks.push({
type: 'table',
column_settings: [
{ align: 'left' },
{ align: 'right' },
{ align: 'right' },
],
rows: waitRows,
});
}
return { blocks, threadBlocks };
}
function buildFailureBlocks({ prNumber, actor, actorSlackId, jobUrl, repo, failedStep }) {
const prUrl = prNumber ? `https://github.com/${repo}/pull/${prNumber}` : '';
const actorMention = actorSlackId ? `<@${actorSlackId}>` : `@${actor}`;
const parts = [
prNumber ? `*<${prUrl}|PR #${prNumber}>*` : '',
`by ${actorMention}`,
`failed while *${failedStep}*`,
].filter(Boolean);
const buttons = [
{
type: 'button',
text: { type: 'plain_text', text: 'CI :github:', emoji: true },
url: jobUrl,
action_id: 'ci_button',
},
];
return [
{
type: 'header',
text: { type: 'plain_text', text: ':rotating_light: Bench Failed', emoji: true },
},
{
type: 'section',
text: { type: 'mrkdwn', text: parts.join(' | ') },
},
{
type: 'actions',
elements: buttons,
},
];
}
async function success({ core, context }) {
const token = process.env.SLACK_BENCH_BOT_TOKEN;
if (!token) {
core.info('SLACK_BENCH_BOT_TOKEN not set, skipping Slack notification');
return;
}
let summary;
try {
summary = JSON.parse(fs.readFileSync(process.env.BENCH_WORK_DIR + '/summary.json', 'utf8'));
} catch (e) {
core.warning('Could not read summary.json for Slack notification');
return;
}
const repo = `${context.repo.owner}/${context.repo.repo}`;
const prNumber = process.env.BENCH_PR;
const actor = process.env.BENCH_ACTOR;
const jobUrl = process.env.BENCH_JOB_URL ||
`${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/actions/runs/${context.runId}`;
// Load samply profile URLs (files exist when samply profiling was enabled)
const samplyUrls = {};
for (const run of ['baseline-1', 'baseline-2', 'feature-1', 'feature-2']) {
try {
const url = fs.readFileSync(
path.join(process.env.BENCH_WORK_DIR, run, 'samply-profile-url.txt'), 'utf8'
).trim();
if (url) samplyUrls[run] = url;
} catch {}
}
const slackUsers = loadSlackUsers(process.env.GITHUB_WORKSPACE || '.');
const actorSlackId = slackUsers[actor];
const { blocks, threadBlocks } = buildSuccessBlocks({ summary, prNumber, actor, actorSlackId, jobUrl, repo, samplyUrls });
const text = `Bench: ${summary.baseline.name} vs ${summary.feature.name}`;
async function sendWithThread(ch) {
const res = await postToSlack(token, ch, blocks, text, core);
if (res.ok && res.ts && threadBlocks.length > 0) {
for (const tb of threadBlocks) {
await postToSlack(token, ch, [tb], 'Wait time breakdown', core, res.ts);
}
}
}
// Post to public channel if any metric shows significant improvement or regression
const channel = process.env.SLACK_BENCH_CHANNEL;
let postedToChannel = false;
if (channel) {
const changes = summary.changes || {};
const hasImprovement = Object.values(changes).some(c => c.sig === 'good');
if (hasImprovement) {
await sendWithThread(channel);
postedToChannel = true;
} else {
core.info('No significant improvement, skipping public channel notification');
}
}
// DM the actor only when results were not posted to the public channel
if (!postedToChannel) {
if (actorSlackId) {
await sendWithThread(actorSlackId);
} else {
core.info(`No Slack user mapping for GitHub user '${actor}', skipping DM`);
}
} else {
core.info(`Results posted to channel, skipping DM to ${actor}`);
}
}
async function failure({ core, context, failedStep }) {
const token = process.env.SLACK_BENCH_BOT_TOKEN;
if (!token) {
core.info('SLACK_BENCH_BOT_TOKEN not set, skipping Slack notification');
return;
}
const repo = `${context.repo.owner}/${context.repo.repo}`;
const prNumber = process.env.BENCH_PR;
const actor = process.env.BENCH_ACTOR;
const jobUrl = process.env.BENCH_JOB_URL ||
`${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/actions/runs/${context.runId}`;
const slackUsers = loadSlackUsers(process.env.GITHUB_WORKSPACE || '.');
const actorSlackId = slackUsers[actor];
const blocks = buildFailureBlocks({ prNumber, actor, actorSlackId, jobUrl, repo, failedStep });
const text = `Bench failed while ${failedStep}`;
// Always DM the actor
if (actorSlackId) {
await postToSlack(token, actorSlackId, blocks, text, core);
} else {
core.info(`No Slack user mapping for GitHub user '${actor}', skipping DM`);
}
// Only DM for failures, don't post to public channel
}
module.exports = { success, failure };

13
.github/scripts/bench-slack-users.json vendored Normal file
View File

@@ -0,0 +1,13 @@
{
"_comment": "Maps GitHub usernames to Slack user IDs. Find yours: Slack profile > ··· > Copy member ID.",
"shekhirin": "U09FAL2UMLJ",
"mattsse": "U09FQNPMRT3",
"klkvr": "U09FAK95FC2",
"joshieDo": "U09LHN6GYAU",
"mediocregopher": "U09FF75KMQU",
"yongkangc": "U09FB0ECTD4",
"gakonst": "U092SEPDM40",
"Rjected": "U09F6SCKRGT",
"DaniPopes": "U09FAT8EK2A",
"emmajam": "U0A34UN92HW"
}

View File

@@ -16,6 +16,9 @@ engine-withdrawals:
- Withdrawals Fork on Block 1 - 8 Block Re-Org NewPayload (Paris) (reth)
- Withdrawals Fork on Block 8 - 10 Block Re-Org NewPayload (Paris) (reth)
- Withdrawals Fork on Canonical Block 8 / Side Block 7 - 10 Block Re-Org (Paris) (reth)
# P2P sync timing issue in hive Docker environment: secondary client returns SYNCING but
# peer discovery/connection doesn't complete within the timeout when running with
# --sim.parallelism 16. Not a correctness bug, purely a CI timing issue.
- Sync after 2 blocks - Withdrawals on Block 2 - Multiple Withdrawal Accounts (Paris) (reth)
- Sync after 2 blocks - Withdrawals on Block 2 - Multiple Withdrawal Accounts - No Transactions (Paris) (reth)
- Sync after 128 blocks - Withdrawals on Block 2 - Multiple Withdrawal Accounts (Paris) (reth)
@@ -24,6 +27,9 @@ engine-cancun:
- Transaction Re-Org, Re-Org to Different Block (Cancun) (reth)
- Transaction Re-Org, Re-Org Out (Cancun) (reth)
- Invalid Missing Ancestor ReOrg, StateRoot, EmptyTxs=False, Invalid P9 (Cancun) (reth)
# Hive test infra bug: geth sidecar switched to PathScheme for state storage, which has
# strict trie integrity requirements incompatible with inserting intentionally invalid blocks.
# Affects all clients, not just reth. Tracked: https://github.com/ethereum/hive/issues/1382
- Invalid Missing Ancestor Syncing ReOrg, Timestamp, EmptyTxs=False, CanonicalReOrg=False, Invalid P8 (Cancun) (reth)
- Invalid Missing Ancestor Syncing ReOrg, Timestamp, EmptyTxs=False, CanonicalReOrg=True, Invalid P8 (Cancun) (reth)
- Multiple New Payloads Extending Canonical Chain, Wait for Canonical Payload (Cancun) (reth)

View File

@@ -35,17 +35,28 @@ on:
required: false
default: ""
type: string
samply:
description: "Enable samply profiling"
required: false
default: "false"
type: boolean
cores:
description: "Limit reth to N CPU cores (0 = all available)"
required: false
default: "0"
type: string
env:
CARGO_TERM_COLOR: always
BASELINE: base
SEED: reth
RUSTC_WRAPPER: "sccache"
BENCH_RUNNERS: 2
name: bench
permissions:
contents: write
contents: read
pull-requests: write
jobs:
@@ -91,7 +102,7 @@ jobs:
reth-bench-ack:
if: |
(github.event_name == 'issue_comment' && github.event.issue.pull_request && startsWith(github.event.comment.body, 'derek bench')) ||
(github.event_name == 'issue_comment' && github.event.issue.pull_request && (startsWith(github.event.comment.body, '@decofe bench') || startsWith(github.event.comment.body, 'derek bench'))) ||
github.event_name == 'workflow_dispatch'
name: reth-bench-ack
runs-on: ubuntu-latest
@@ -104,6 +115,8 @@ jobs:
feature: ${{ steps.args.outputs.feature }}
baseline-name: ${{ steps.args.outputs.baseline-name }}
feature-name: ${{ steps.args.outputs.feature-name }}
samply: ${{ steps.args.outputs.samply }}
cores: ${{ steps.args.outputs.cores }}
comment-id: ${{ steps.ack.outputs.comment-id }}
steps:
- name: Check org membership
@@ -129,8 +142,9 @@ jobs:
id: args
uses: actions/github-script@v8
with:
github-token: ${{ secrets.DEREK_PAT }}
script: |
let pr, actor, blocks, warmup, baseline, feature;
let pr, actor, blocks, warmup, baseline, feature, samply, cores;
if (context.eventName === 'workflow_dispatch') {
actor = '${{ github.actor }}';
@@ -138,6 +152,8 @@ jobs:
warmup = '${{ github.event.inputs.warmup }}' || '100';
baseline = '${{ github.event.inputs.baseline }}';
feature = '${{ github.event.inputs.feature }}';
samply = '${{ github.event.inputs.samply }}' === 'true' ? 'true' : 'false';
cores = '${{ github.event.inputs.cores }}' || '0';
// Find PR for the selected branch
const branch = '${{ github.ref_name }}';
@@ -157,16 +173,21 @@ jobs:
actor = context.payload.comment.user.login;
const body = context.payload.comment.body.trim();
const intArgs = new Set(['blocks', 'warmup']);
const intArgs = new Set(['blocks', 'warmup', 'cores']);
const refArgs = new Set(['baseline', 'feature']);
const defaults = { blocks: '500', warmup: '100', baseline: '', feature: '' };
const boolArgs = new Set(['samply']);
const defaults = { blocks: '500', warmup: '100', baseline: '', feature: '', samply: 'false', cores: '0' };
const unknown = [];
const invalid = [];
const args = body.replace(/^derek bench\s*/, '');
const args = body.replace(/^(?:@decofe|derek) bench\s*/, '');
for (const part of args.split(/\s+/).filter(Boolean)) {
const eq = part.indexOf('=');
if (eq === -1) {
unknown.push(part);
if (boolArgs.has(part)) {
defaults[part] = 'true';
} else {
unknown.push(part);
}
continue;
}
const key = part.slice(0, eq);
@@ -191,7 +212,7 @@ jobs:
if (unknown.length) errors.push(`Unknown argument(s): \`${unknown.join('`, `')}\``);
if (invalid.length) errors.push(`Invalid value(s): ${invalid.join(', ')}`);
if (errors.length) {
const msg = `❌ **Invalid bench command**\n\n${errors.join('\n')}\n\n**Usage:** \`derek bench [blocks=N] [warmup=N] [baseline=REF] [feature=REF]\``;
const msg = `❌ **Invalid bench command**\n\n${errors.join('\n')}\n\n**Usage:** \`@decofe bench [blocks=N] [warmup=N] [baseline=REF] [feature=REF] [samply] [cores=N]\``;
await github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
@@ -205,6 +226,8 @@ jobs:
warmup = defaults.warmup;
baseline = defaults.baseline;
feature = defaults.feature;
samply = defaults.samply;
cores = defaults.cores;
}
// Resolve display names for baseline/feature
@@ -231,11 +254,14 @@ jobs:
core.setOutput('feature', feature);
core.setOutput('baseline-name', baselineName);
core.setOutput('feature-name', featureName);
core.setOutput('samply', samply);
core.setOutput('cores', cores);
- name: Acknowledge request
id: ack
uses: actions/github-script@v8
with:
github-token: ${{ secrets.DEREK_PAT }}
script: |
if (context.eventName === 'issue_comment') {
await github.rest.reactions.createForIssueComment({
@@ -251,9 +277,11 @@ jobs:
const runUrl = `${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/actions/runs/${context.runId}`;
// Count queued/waiting bench runs ahead of this one
// Count queued/waiting bench runs ahead of this one.
// BENCH_RUNNERS is the number of self-hosted runners available.
let queueMsg = '';
let ahead = 0;
const numRunners = parseInt(process.env.BENCH_RUNNERS) || 1;
try {
const statuses = ['queued', 'in_progress', 'waiting', 'requested', 'pending'];
const allRuns = [];
@@ -271,13 +299,13 @@ jobs:
const benchRuns = allRuns.filter(r => r.event === 'issue_comment' || r.event === 'workflow_dispatch');
const thisRun = benchRuns.find(r => r.id === context.runId);
const thisCreatedAt = thisRun ? new Date(thisRun.created_at) : new Date();
ahead = benchRuns.filter(r => r.id !== context.runId && new Date(r.created_at) <= thisCreatedAt).length;
const totalAhead = benchRuns.filter(r => r.id !== context.runId && new Date(r.created_at) <= thisCreatedAt).length;
ahead = Math.max(0, totalAhead - numRunners + 1);
if (ahead > 0) {
queueMsg = `\n🔢 **Queue position:** \`#${ahead + 1}\` (${ahead} job(s) ahead)`;
queueMsg = `\n🔢 **Queue position:** ${ahead} job(s) ahead (${numRunners} runner(s))`;
}
} catch (e) {
// Non-fatal — queue info is best-effort
core.info(`Could not fetch queue info: ${e.message}`);
core.info(`Skipping queue tracking: ${e.message}`);
}
const actor = '${{ steps.args.outputs.actor }}';
@@ -285,7 +313,11 @@ jobs:
const warmup = '${{ steps.args.outputs.warmup }}';
const baseline = '${{ steps.args.outputs.baseline-name }}';
const feature = '${{ steps.args.outputs.feature-name }}';
const config = `**Config:** ${blocks} blocks, ${warmup} warmup blocks, baseline: \`${baseline}\`, feature: \`${feature}\``;
const samply = '${{ steps.args.outputs.samply }}' === 'true';
const samplyNote = samply ? ', samply: `enabled`' : '';
const cores = '${{ steps.args.outputs.cores }}';
const coresNote = cores && cores !== '0' ? `, cores: \`${cores}\`` : '';
const config = `**Config:** ${blocks} blocks, ${warmup} warmup blocks, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${coresNote}`;
const { data: comment } = await github.rest.issues.createComment({
owner: context.repo.owner,
@@ -300,6 +332,7 @@ jobs:
if: steps.ack.outputs.comment-id && steps.ack.outputs.queue-position != '0'
uses: actions/github-script@v8
with:
github-token: ${{ secrets.DEREK_PAT }}
script: |
const pr = '${{ steps.args.outputs.pr }}';
const commentId = parseInt('${{ steps.ack.outputs.comment-id }}');
@@ -308,9 +341,14 @@ jobs:
const warmup = '${{ steps.args.outputs.warmup }}';
const baseline = '${{ steps.args.outputs.baseline-name }}';
const feature = '${{ steps.args.outputs.feature-name }}';
const config = `**Config:** ${blocks} blocks, ${warmup} warmup blocks, baseline: \`${baseline}\`, feature: \`${feature}\``;
const samply = '${{ steps.args.outputs.samply }}' === 'true';
const samplyNote = samply ? ', samply: `enabled`' : '';
const cores = '${{ steps.args.outputs.cores }}';
const coresNote = cores && cores !== '0' ? `, cores: \`${cores}\`` : '';
const config = `**Config:** ${blocks} blocks, ${warmup} warmup blocks, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${coresNote}`;
const runUrl = `${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/actions/runs/${context.runId}`;
const numRunners = parseInt(process.env.BENCH_RUNNERS) || 1;
async function getQueuePosition() {
const statuses = ['queued', 'in_progress', 'waiting', 'requested', 'pending'];
const allRuns = [];
@@ -327,7 +365,8 @@ jobs:
const benchRuns = allRuns.filter(r => r.event === 'issue_comment' || r.event === 'workflow_dispatch');
const thisRun = benchRuns.find(r => r.id === context.runId);
const thisCreatedAt = thisRun ? new Date(thisRun.created_at) : new Date();
return benchRuns.filter(r => r.id !== context.runId && new Date(r.created_at) <= thisCreatedAt).length;
const totalAhead = benchRuns.filter(r => r.id !== context.runId && new Date(r.created_at) <= thisCreatedAt).length;
return { ahead: Math.max(0, totalAhead - numRunners + 1), numRunners };
}
let lastPosition = parseInt('${{ steps.ack.outputs.queue-position }}');
@@ -336,11 +375,11 @@ jobs:
while (true) {
await sleep(10_000);
try {
const ahead = await getQueuePosition();
const { ahead, numRunners } = await getQueuePosition();
if (ahead !== lastPosition) {
lastPosition = ahead;
const queueMsg = ahead > 0
? `\n🔢 **Queue position:** \`#${ahead + 1}\` (${ahead} job(s) ahead)`
? `\n🔢 **Queue position:** ${ahead} job(s) ahead (${numRunners} runner(s))`
: '';
await github.rest.issues.updateComment({
owner: context.repo.owner,
@@ -360,9 +399,6 @@ jobs:
name: reth-bench
runs-on: [self-hosted, Linux, X64]
timeout-minutes: 120
concurrency:
group: reth-bench-queue
cancel-in-progress: false
env:
BENCH_RPC_URL: https://ethereum.reth.rs/rpc
SCHELK_MOUNT: /reth-bench
@@ -371,6 +407,8 @@ jobs:
BENCH_ACTOR: ${{ needs.reth-bench-ack.outputs.actor }}
BENCH_BLOCKS: ${{ needs.reth-bench-ack.outputs.blocks }}
BENCH_WARMUP_BLOCKS: ${{ needs.reth-bench-ack.outputs.warmup }}
BENCH_SAMPLY: ${{ needs.reth-bench-ack.outputs.samply }}
BENCH_CORES: ${{ needs.reth-bench-ack.outputs.cores }}
BENCH_COMMENT_ID: ${{ needs.reth-bench-ack.outputs.comment-id }}
steps:
- name: Resolve checkout ref
@@ -405,6 +443,7 @@ jobs:
if: env.BENCH_COMMENT_ID
uses: actions/github-script@v8
with:
github-token: ${{ secrets.DEREK_PAT }}
script: |
const { data: jobs } = await github.rest.actions.listJobsForWorkflowRun({
owner: context.repo.owner,
@@ -419,7 +458,11 @@ jobs:
const warmup = process.env.BENCH_WARMUP_BLOCKS;
const baseline = '${{ needs.reth-bench-ack.outputs.baseline-name }}';
const feature = '${{ needs.reth-bench-ack.outputs.feature-name }}';
core.exportVariable('BENCH_CONFIG', `**Config:** ${blocks} blocks, ${warmup} warmup blocks, baseline: \`${baseline}\`, feature: \`${feature}\``);
const samply = process.env.BENCH_SAMPLY === 'true';
const samplyNote = samply ? ', samply: `enabled`' : '';
const cores = process.env.BENCH_CORES || '0';
const coresNote = cores && cores !== '0' ? `, cores: \`${cores}\`` : '';
core.exportVariable('BENCH_CONFIG', `**Config:** ${blocks} blocks, ${warmup} warmup blocks, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${coresNote}`);
const { buildBody } = require('./.github/scripts/bench-update-status.js');
await github.rest.issues.updateComment({
@@ -433,6 +476,52 @@ jobs:
- uses: mozilla-actions/sccache-action@v0.0.9
continue-on-error: true
- name: Install dependencies
env:
DEREK_TOKEN: ${{ secrets.DEREK_TOKEN }}
run: |
mkdir -p "$HOME/.local/bin"
# apt packages
sudo apt-get update -qq
sudo apt-get install -y --no-install-recommends \
python3 make jq zstd curl dmsetup \
linux-tools-"$(uname -r)" || \
sudo apt-get install -y --no-install-recommends linux-tools-generic
# mc (MinIO client)
if ! command -v mc &>/dev/null; then
curl -sSfL https://dl.min.io/client/mc/release/linux-amd64/mc -o "$HOME/.local/bin/mc"
chmod +x "$HOME/.local/bin/mc"
fi
# uv (Python package manager)
if ! command -v uv &>/dev/null; then
curl -LsSf https://astral.sh/uv/install.sh | env UV_INSTALL_DIR="$HOME/.local/bin" sh
fi
# Configure git auth for private repos
git config --global url."https://x-access-token:${DEREK_TOKEN}@github.com/".insteadOf "https://github.com/"
# thin-provisioning-tools (era_invalidate, required by schelk)
if ! command -v era_invalidate &>/dev/null; then
git clone --depth 1 https://github.com/jthornber/thin-provisioning-tools /tmp/tpt
sudo make -C /tmp/tpt install
rm -rf /tmp/tpt
fi
# schelk (snapshot rollback tool, invoked via sudo)
if ! sudo sh -c 'command -v schelk' &>/dev/null; then
cargo install --git https://github.com/tempoxyz/schelk --locked
sudo install "$HOME/.cargo/bin/schelk" /usr/local/bin/
fi
# samply (optional CPU profiler, invoked via sudo)
if [ "${BENCH_SAMPLY:-false}" = "true" ] && ! sudo sh -c 'command -v samply' &>/dev/null; then
cargo install samply --git https://github.com/DaniPopes/samply --branch edge --locked
sudo install "$HOME/.cargo/bin/samply" /usr/local/bin/
fi
# Verify all required tools are available
- name: Check dependencies
run: |
@@ -470,32 +559,51 @@ jobs:
- name: Resolve refs
id: refs
run: |
BASELINE_ARG="${{ needs.reth-bench-ack.outputs.baseline }}"
FEATURE_ARG="${{ needs.reth-bench-ack.outputs.feature }}"
uses: actions/github-script@v8
with:
script: |
const { execSync } = require('child_process');
const run = (cmd) => execSync(cmd, { encoding: 'utf8' }).trim();
if [ -n "$BASELINE_ARG" ]; then
git fetch origin "$BASELINE_ARG" --quiet 2>/dev/null || true
BASELINE_REF=$(git rev-parse "$BASELINE_ARG" 2>/dev/null || git rev-parse "origin/$BASELINE_ARG" 2>/dev/null)
BASELINE_NAME="$BASELINE_ARG"
else
BASELINE_REF=$(git merge-base HEAD origin/main 2>/dev/null || echo "${{ github.sha }}")
BASELINE_NAME="main"
fi
const baselineArg = '${{ needs.reth-bench-ack.outputs.baseline }}';
const featureArg = '${{ needs.reth-bench-ack.outputs.feature }}';
if [ -n "$FEATURE_ARG" ]; then
git fetch origin "$FEATURE_ARG" --quiet 2>/dev/null || true
FEATURE_REF=$(git rev-parse "$FEATURE_ARG" 2>/dev/null || git rev-parse "origin/$FEATURE_ARG" 2>/dev/null)
FEATURE_NAME="$FEATURE_ARG"
else
FEATURE_REF="${{ steps.pr-info.outputs.head-sha }}"
FEATURE_NAME="${{ steps.pr-info.outputs.head-ref }}"
fi
let baselineRef, baselineName, featureRef, featureName;
echo "baseline-ref=$BASELINE_REF" >> "$GITHUB_OUTPUT"
echo "baseline-name=$BASELINE_NAME" >> "$GITHUB_OUTPUT"
echo "feature-ref=$FEATURE_REF" >> "$GITHUB_OUTPUT"
echo "feature-name=$FEATURE_NAME" >> "$GITHUB_OUTPUT"
if (baselineArg) {
try { run(`git fetch origin "${baselineArg}" --quiet`); } catch {}
try {
baselineRef = run(`git rev-parse "${baselineArg}"`);
} catch {
baselineRef = run(`git rev-parse "origin/${baselineArg}"`);
}
baselineName = baselineArg;
} else {
try {
baselineRef = run('git merge-base HEAD origin/main');
} catch {
baselineRef = '${{ github.sha }}';
}
baselineName = 'main';
}
if (featureArg) {
try { run(`git fetch origin "${featureArg}" --quiet`); } catch {}
try {
featureRef = run(`git rev-parse "${featureArg}"`);
} catch {
featureRef = run(`git rev-parse "origin/${featureArg}"`);
}
featureName = featureArg;
} else {
featureRef = '${{ steps.pr-info.outputs.head-sha }}';
featureName = '${{ steps.pr-info.outputs.head-ref }}';
}
core.setOutput('baseline-ref', baselineRef);
core.setOutput('baseline-name', baselineName);
core.setOutput('feature-ref', featureRef);
core.setOutput('feature-name', featureName);
- name: Check if snapshot needs update
id: snapshot-check
@@ -510,6 +618,7 @@ jobs:
if: env.BENCH_COMMENT_ID && steps.snapshot-check.outputs.needed == 'true'
uses: actions/github-script@v8
with:
github-token: ${{ secrets.DEREK_PAT }}
script: |
const s = require('./.github/scripts/bench-update-status.js');
await s({github, context, status: 'Building binaries & downloading snapshot...'});
@@ -617,6 +726,7 @@ jobs:
if: success() && env.BENCH_COMMENT_ID
uses: actions/github-script@v8
with:
github-token: ${{ secrets.DEREK_PAT }}
script: |
const s = require('./.github/scripts/bench-update-status.js');
await s({github, context, status: 'Running benchmarks...'});
@@ -639,6 +749,82 @@ jobs:
id: run-baseline-2
run: taskset -c 0 .github/scripts/bench-reth-run.sh baseline ../reth-baseline/target/profiling/reth "$BENCH_WORK_DIR/baseline-2"
- name: Scan logs for errors
if: "!cancelled()"
run: |
ERRORS_FILE="$BENCH_WORK_DIR/errors.md"
found=false
for run_dir in baseline-1 feature-1 feature-2 baseline-2; do
LOG="$BENCH_WORK_DIR/$run_dir/node.log"
if [ ! -f "$LOG" ]; then continue; fi
panics=$(grep -c -E 'panicked at' "$LOG" || true)
errors=$(grep -c ' ERROR ' "$LOG" || true)
if [ "$panics" -gt 0 ] || [ "$errors" -gt 0 ]; then
if [ "$found" = false ]; then
printf '### ⚠️ Node Errors\n\n' >> "$ERRORS_FILE"
found=true
fi
printf '<details><summary><b>%s</b>: %d panic(s), %d error(s)</summary>\n\n' "$run_dir" "$panics" "$errors" >> "$ERRORS_FILE"
if [ "$panics" -gt 0 ]; then
printf '**Panics:**\n```\n' >> "$ERRORS_FILE"
grep -E 'panicked at' "$LOG" | head -10 >> "$ERRORS_FILE"
printf '```\n' >> "$ERRORS_FILE"
fi
if [ "$errors" -gt 0 ]; then
printf '**Errors (first 20):**\n```\n' >> "$ERRORS_FILE"
grep ' ERROR ' "$LOG" | head -20 >> "$ERRORS_FILE"
printf '```\n' >> "$ERRORS_FILE"
fi
printf '\n</details>\n\n' >> "$ERRORS_FILE"
fi
done
- name: Upload samply profiles
if: success() && env.BENCH_SAMPLY == 'true'
run: |
PROFILER_API="https://api.profiler.firefox.com"
PROFILER_ACCEPT="Accept: application/vnd.firefox-profiler+json;version=1.0"
for run_dir in baseline-1 baseline-2 feature-1 feature-2; do
PROFILE="$BENCH_WORK_DIR/$run_dir/samply-profile.json.gz"
if [ ! -f "$PROFILE" ]; then continue; fi
PROFILE_SIZE=$(du -h "$PROFILE" | cut -f1)
echo "Uploading $run_dir samply profile (${PROFILE_SIZE}) to Firefox Profiler..."
# Upload compressed profile and get JWT back
JWT=$(curl -sf -X POST \
-H "Content-Type: application/octet-stream" \
-H "$PROFILER_ACCEPT" \
--data-binary "@$PROFILE" \
"$PROFILER_API/compressed-store") || {
echo "::warning::Failed to upload $run_dir profile to Firefox Profiler"
continue
}
# Extract profileToken from JWT payload (header.payload.signature)
PAYLOAD=$(echo "$JWT" | cut -d. -f2)
# Fix base64 padding
case $(( ${#PAYLOAD} % 4 )) in
2) PAYLOAD="${PAYLOAD}==" ;;
3) PAYLOAD="${PAYLOAD}=" ;;
esac
PROFILE_TOKEN=$(echo "$PAYLOAD" | base64 -d 2>/dev/null | python3 -c "import sys,json; print(json.load(sys.stdin)['profileToken'])")
PROFILE_URL="https://profiler.firefox.com/public/${PROFILE_TOKEN}"
echo "Profile uploaded: $PROFILE_URL"
# Shorten the URL
SHORT_URL=$(curl -sf -X POST \
-H "Content-Type: application/json" \
-H "$PROFILER_ACCEPT" \
-d "{\"longUrl\":\"$PROFILE_URL\"}" \
"$PROFILER_API/shorten" | python3 -c "import sys,json; print(json.load(sys.stdin)['shortUrl'])" 2>/dev/null) || SHORT_URL="$PROFILE_URL"
echo "$SHORT_URL" > "$BENCH_WORK_DIR/$run_dir/samply-profile-url.txt"
echo "Short profile URL for $run_dir: $SHORT_URL"
done
# Results & charts
- name: Parse results
id: results
@@ -687,7 +873,7 @@ jobs:
uv run --with matplotlib python3 .github/scripts/bench-reth-charts.py $CHART_ARGS
- name: Upload results
if: success()
if: "!cancelled()"
uses: actions/upload-artifact@v6
with:
name: bench-reth-results
@@ -695,31 +881,35 @@ jobs:
- name: Push charts
id: push-charts
if: success() && env.BENCH_PR
if: success()
run: |
PR_NUMBER=${{ env.BENCH_PR }}
PR_NUMBER="${BENCH_PR:-0}"
RUN_ID=${{ github.run_id }}
CHART_DIR="pr/${PR_NUMBER}/${RUN_ID}"
CHARTS_REPO="https://x-access-token:${{ secrets.DEREK_TOKEN }}@github.com/decofe/reth-bench-charts.git"
if git fetch origin bench-charts 2>/dev/null; then
git checkout bench-charts
TMP_DIR=$(mktemp -d)
if git clone --depth 1 "${CHARTS_REPO}" "${TMP_DIR}" 2>/dev/null; then
true
else
git checkout --orphan bench-charts
git rm -rf . 2>/dev/null || true
git init "${TMP_DIR}"
git -C "${TMP_DIR}" remote add origin "${CHARTS_REPO}"
fi
mkdir -p "${CHART_DIR}"
cp "$BENCH_WORK_DIR"/charts/*.png "${CHART_DIR}/"
git add "${CHART_DIR}"
git -c user.name="github-actions" -c user.email="github-actions@github.com" \
mkdir -p "${TMP_DIR}/${CHART_DIR}"
cp "$BENCH_WORK_DIR"/charts/*.png "${TMP_DIR}/${CHART_DIR}/"
git -C "${TMP_DIR}" add "${CHART_DIR}"
git -C "${TMP_DIR}" -c user.name="github-actions" -c user.email="github-actions@github.com" \
commit -m "bench charts for PR #${PR_NUMBER} run ${RUN_ID}"
git push origin bench-charts
echo "sha=$(git rev-parse HEAD)" >> "$GITHUB_OUTPUT"
git -C "${TMP_DIR}" push origin HEAD:main
echo "sha=$(git -C "${TMP_DIR}" rev-parse HEAD)" >> "$GITHUB_OUTPUT"
rm -rf "${TMP_DIR}"
- name: Compare & comment
if: success()
uses: actions/github-script@v8
with:
github-token: ${{ secrets.DEREK_PAT }}
script: |
const fs = require('fs');
@@ -731,11 +921,11 @@ jobs:
}
const sha = '${{ steps.push-charts.outputs.sha }}';
const prNumber = process.env.BENCH_PR;
const prNumber = process.env.BENCH_PR || '0';
const runId = '${{ github.run_id }}';
if (sha && prNumber) {
const baseUrl = `https://raw.githubusercontent.com/${context.repo.owner}/${context.repo.repo}/${sha}/pr/${prNumber}/${runId}`;
if (sha) {
const baseUrl = `https://raw.githubusercontent.com/decofe/reth-bench-charts/${sha}/pr/${prNumber}/${runId}`;
const charts = [
{ file: 'latency_throughput.png', label: 'Latency, Throughput & Diff' },
{ file: 'wait_breakdown.png', label: 'Wait Time Breakdown' },
@@ -750,6 +940,31 @@ jobs:
comment += chartMarkdown;
}
// Samply profile links (URLs point directly to Firefox Profiler)
if (process.env.BENCH_SAMPLY === 'true') {
const runs = ['baseline-1', 'feature-1', 'feature-2', 'baseline-2'];
const links = [];
for (const run of runs) {
try {
const url = fs.readFileSync(`${process.env.BENCH_WORK_DIR}/${run}/samply-profile-url.txt`, 'utf8').trim();
if (url) {
links.push(`- **${run}**: [Firefox Profiler](${url})`);
}
} catch (e) {}
}
if (links.length > 0) {
comment += `\n\n### Samply Profiles\n\n${links.join('\n')}\n`;
}
}
// Node errors (panics / ERROR logs)
try {
const errors = fs.readFileSync(process.env.BENCH_WORK_DIR + '/errors.md', 'utf8');
if (errors.trim()) {
comment += '\n\n' + errors;
}
} catch (e) {}
const jobUrl = process.env.BENCH_JOB_URL || `${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/actions/runs/${context.runId}`;
const body = `cc @${process.env.BENCH_ACTOR}\n\n✅ Benchmark complete! [View job](${jobUrl})\n\n${comment}`;
const ackCommentId = process.env.BENCH_COMMENT_ID;
@@ -766,10 +981,22 @@ jobs:
await core.summary.addRaw(body).write();
}
- name: Send Slack notification (success)
if: success()
uses: actions/github-script@v8
env:
SLACK_BENCH_BOT_TOKEN: ${{ secrets.SLACK_BENCH_BOT_TOKEN }}
SLACK_BENCH_CHANNEL: ${{ secrets.SLACK_BENCH_CHANNEL }}
with:
script: |
const notify = require('./.github/scripts/bench-slack-notify.js');
await notify.success({ core, context });
- name: Update status (failed)
if: failure() && env.BENCH_COMMENT_ID
uses: actions/github-script@v8
with:
github-token: ${{ secrets.DEREK_PAT }}
script: |
const steps_status = [
['building binaries${{ steps.snapshot-check.outputs.needed == 'true' && ' & downloading snapshot' || '' }}', '${{ steps.build.outcome }}'],
@@ -781,19 +1008,54 @@ jobs:
const failed = steps_status.find(([, o]) => o === 'failure');
const failedStep = failed ? failed[0] : 'unknown step';
const fs = require('fs');
let errorDetails = '';
try {
const errors = fs.readFileSync(process.env.BENCH_WORK_DIR + '/errors.md', 'utf8');
if (errors.trim()) {
errorDetails = '\n\n' + errors;
}
} catch (e) {}
const jobUrl = process.env.BENCH_JOB_URL || `${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/actions/runs/${context.runId}`;
await github.rest.issues.updateComment({
owner: context.repo.owner, repo: context.repo.repo,
comment_id: parseInt(process.env.BENCH_COMMENT_ID),
body: `cc @${process.env.BENCH_ACTOR}\n\n❌ Benchmark failed while ${failedStep}. [View logs](${process.env.BENCH_JOB_URL})`,
body: `cc @${process.env.BENCH_ACTOR}\n\n❌ Benchmark failed while ${failedStep}. [View logs](${jobUrl})${errorDetails}`,
});
- name: Upload node log
- name: Send Slack notification (failure)
if: failure()
uses: actions/upload-artifact@v6
uses: actions/github-script@v8
env:
SLACK_BENCH_BOT_TOKEN: ${{ secrets.SLACK_BENCH_BOT_TOKEN }}
SLACK_BENCH_CHANNEL: ${{ secrets.SLACK_BENCH_CHANNEL }}
with:
name: reth-node-log
path: |
${{ env.BENCH_WORK_DIR }}/*/node.log
script: |
const steps_status = [
['building binaries${{ steps.snapshot-check.outputs.needed == 'true' && ' & downloading snapshot' || '' }}', '${{ steps.build.outcome }}'],
['running baseline benchmark (1/2)', '${{ steps.run-baseline-1.outcome }}'],
['running feature benchmark (1/2)', '${{ steps.run-feature-1.outcome }}'],
['running feature benchmark (2/2)', '${{ steps.run-feature-2.outcome }}'],
['running baseline benchmark (2/2)', '${{ steps.run-baseline-2.outcome }}'],
];
const failed = steps_status.find(([, o]) => o === 'failure');
const failedStep = failed ? failed[0] : 'unknown step';
const notify = require('./.github/scripts/bench-slack-notify.js');
await notify.failure({ core, context, failedStep });
- name: Update status (cancelled)
if: cancelled() && env.BENCH_COMMENT_ID
uses: actions/github-script@v8
with:
github-token: ${{ secrets.DEREK_PAT }}
script: |
const jobUrl = process.env.BENCH_JOB_URL || `${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/actions/runs/${context.runId}`;
await github.rest.issues.updateComment({
owner: context.repo.owner, repo: context.repo.repo,
comment_id: parseInt(process.env.BENCH_COMMENT_ID),
body: `cc @${process.env.BENCH_ACTOR}\n\n⚠ Benchmark cancelled. [View logs](${jobUrl})`,
});
- name: Restore system settings
if: always()

View File

@@ -74,7 +74,7 @@ jobs:
profile: maxperf
allow_fail: false
- target: aarch64-unknown-linux-gnu
os: ubuntu-24.04
os: ubuntu-24.04-arm
profile: maxperf
allow_fail: false
- target: x86_64-apple-darwin
@@ -85,10 +85,6 @@ jobs:
os: macos-14
profile: maxperf
allow_fail: false
- target: riscv64gc-unknown-linux-gnu
os: ubuntu-24.04
profile: maxperf
allow_fail: true
build:
- command: build
binary: reth

View File

@@ -172,7 +172,22 @@ Before submitting changes, ensure:
2. **Clippy**: No warnings
3. **Tests Pass**: All unit and integration tests
4. **Documentation**: Update relevant docs and add doc comments with `cargo docs --document-private-items`
5. **Commit Messages**: Follow conventional format (feat:, fix:, chore:, etc.)
5. **CLI Docs** (if CLI changed): Run `make update-book-cli` (see below)
6. **Commit Messages**: Follow conventional format (feat:, fix:, chore:, etc.)
### CLI Reference Docs (`book` CI Job)
The CLI reference pages under `docs/vocs/docs/pages/cli/` are **auto-generated** from the `reth` binary's `--help` output. **Do not edit these files manually** — any hand edits will be overwritten and CI will fail regardless.
When you add, remove, or modify CLI commands, subcommands, or flags, regenerate the CLI docs by running:
```bash
make update-book-cli
```
This builds `reth` in debug mode and runs `docs/cli/update.sh` to regenerate all CLI pages. Commit the resulting changes.
The `book` CI job (`.github/workflows/lint.yml`) enforces this by regenerating the docs and running `git diff --exit-code`. If the committed docs don't match the generated output, CI fails. Manually editing these pages is never productive — always use `make update-book-cli`.
### Opening PRs against <https://github.com/paradigmxyz/reth>
@@ -455,5 +470,8 @@ cargo build --release
cargo check --workspace --all-features
# Check documentation
cargo docs --document-private-items
cargo docs --document-private-items
# Regenerate CLI reference docs (after CLI changes)
make update-book-cli
```

493
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,5 @@
[workspace.package]
version = "1.11.0"
version = "1.11.1"
edition = "2024"
rust-version = "1.93"
license = "MIT OR Apache-2.0"
@@ -138,6 +138,7 @@ members = [
"examples/exex-subscription",
"examples/exex-test",
"examples/full-contract-state",
"examples/migrate-trie-to-packed",
"examples/manual-p2p/",
"examples/network-txpool/",
"examples/network/",
@@ -397,7 +398,7 @@ reth-payload-builder-primitives = { path = "crates/payload/builder-primitives" }
reth-payload-primitives = { path = "crates/payload/primitives" }
reth-payload-validator = { path = "crates/payload/validator" }
reth-payload-util = { path = "crates/payload/util" }
reth-primitives = { path = "crates/primitives", default-features = false }
reth-primitives = { path = "crates/primitives", default-features = false, features = ["__internal"] }
reth-primitives-traits = { path = "crates/primitives-traits", default-features = false }
reth-provider = { path = "crates/storage/provider" }
reth-prune = { path = "crates/prune/prune" }
@@ -532,13 +533,13 @@ quanta = "0.12"
paste = "1.0"
rand = "0.9"
rayon = "1.7"
thread-priority = "3.0.0"
rustc-hash = { version = "2.0", default-features = false }
schnellru = "0.2"
serde = { version = "1.0", default-features = false }
serde_json = { version = "1.0", default-features = false, features = ["alloc"] }
serde_with = { version = "3", default-features = false, features = ["macros"] }
sha2 = { version = "0.10", default-features = false }
shellexpand = "3.0.0"
shlex = "1.3"
smallvec = "1"
strum = { version = "0.27", default-features = false }

View File

@@ -45,9 +45,6 @@ serde_json.workspace = true
# Time handling
chrono = { workspace = true, features = ["serde"] }
# Path manipulation
shellexpand.workspace = true
# CSV handling
csv.workspace = true

View File

@@ -289,11 +289,7 @@ impl Args {
/// Get the JWT secret path - either provided or derived from datadir
pub(crate) fn jwt_secret_path(&self) -> PathBuf {
match &self.jwt_secret {
Some(path) => {
let jwt_secret_str = path.to_string_lossy();
let expanded = shellexpand::tilde(&jwt_secret_str);
PathBuf::from(expanded.as_ref())
}
Some(path) => path.clone(),
None => {
// Use the same logic as reth: <datadir>/<chain>/jwt.hex
let chain_path = self.datadir.clone().resolve_datadir(self.chain);
@@ -308,10 +304,9 @@ impl Args {
chain_path.data_dir().to_path_buf()
}
/// Get the expanded output directory path
/// Get the output directory path
pub(crate) fn output_dir_path(&self) -> PathBuf {
let expanded = shellexpand::tilde(&self.output_dir);
PathBuf::from(expanded.as_ref())
PathBuf::from(&self.output_dir)
}
/// Get the effective warmup blocks value - either specified or defaults to blocks

View File

@@ -6,7 +6,9 @@ use crate::{
helpers::{build_payload, parse_gas_limit, prepare_payload_request, rpc_block_to_header},
output::GasRampPayloadFile,
},
valid_payload::{call_forkchoice_updated, call_new_payload_with_reth, payload_to_new_payload},
valid_payload::{
call_forkchoice_updated_with_reth, call_new_payload_with_reth, payload_to_new_payload,
},
};
use alloy_eips::BlockNumberOrTag;
use alloy_provider::{network::AnyNetwork, Provider, RootProvider};
@@ -147,7 +149,7 @@ impl Command {
}
}
if self.reth_new_payload {
info!("Using reth_newPayload endpoint");
info!("Using reth_newPayload and reth_forkchoiceUpdated endpoints");
}
let mut blocks_processed = 0u64;
@@ -203,7 +205,13 @@ impl Command {
safe_block_hash: block_hash,
finalized_block_hash: block_hash,
};
call_forkchoice_updated(&provider, version, forkchoice_state, None).await?;
call_forkchoice_updated_with_reth(
&provider,
version,
forkchoice_state,
self.reth_new_payload,
)
.await?;
parent_header = block.header;
parent_hash = block_hash;

View File

@@ -21,7 +21,9 @@ use crate::{
derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter,
},
},
valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload_with_reth},
valid_payload::{
block_to_new_payload, call_forkchoice_updated_with_reth, call_new_payload_with_reth,
},
};
use alloy_provider::Provider;
use alloy_rpc_types_engine::ForkchoiceState;
@@ -159,7 +161,7 @@ impl Command {
let mut metrics_scraper = MetricsScraper::maybe_new(self.benchmark.metrics_url.clone());
if use_reth_namespace {
info!("Using reth_newPayload endpoint");
info!("Using reth_newPayload and reth_forkchoiceUpdated endpoints");
}
let buffer_size = self.rpc_block_buffer_size;
@@ -261,7 +263,13 @@ impl Command {
};
let fcu_start = Instant::now();
call_forkchoice_updated(&auth_provider, version, forkchoice_state, None).await?;
call_forkchoice_updated_with_reth(
&auth_provider,
version,
forkchoice_state,
use_reth_namespace,
)
.await?;
let fcu_latency = fcu_start.elapsed();
let total_latency = if server_timings.is_some() {

View File

@@ -24,10 +24,10 @@ use crate::{
derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter,
},
},
valid_payload::{call_forkchoice_updated, call_new_payload_with_reth},
valid_payload::{call_forkchoice_updated_with_reth, call_new_payload_with_reth},
};
use alloy_primitives::B256;
use alloy_provider::{ext::EngineApi, network::AnyNetwork, Provider, RootProvider};
use alloy_provider::{network::AnyNetwork, Provider, RootProvider};
use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types_engine::{
CancunPayloadFields, ExecutionData, ExecutionPayloadEnvelopeV4, ExecutionPayloadSidecar,
@@ -184,7 +184,7 @@ impl Command {
);
}
if self.reth_new_payload {
info!("Using reth_newPayload endpoint");
info!("Using reth_newPayload and reth_forkchoiceUpdated endpoints");
}
// Set up waiter based on configured options
@@ -288,7 +288,13 @@ impl Command {
safe_block_hash: parent_hash,
finalized_block_hash: parent_hash,
};
call_forkchoice_updated(&auth_provider, payload.version, fcu_state, None).await?;
call_forkchoice_updated_with_reth(
&auth_provider,
payload.version,
fcu_state,
self.reth_new_payload,
)
.await?;
info!(target: "reth-bench", gas_ramp_payload = i + 1, "Gas ramp payload executed successfully");
@@ -384,10 +390,14 @@ impl Command {
finalized_block_hash: parent_hash,
};
debug!(target: "reth-bench", method = "engine_forkchoiceUpdatedV3", ?fcu_state, "Sending forkchoiceUpdated");
let fcu_start = Instant::now();
let fcu_result = auth_provider.fork_choice_updated_v3(fcu_state, None).await?;
call_forkchoice_updated_with_reth(
&auth_provider,
EngineApiMessageVersion::V4,
fcu_state,
self.reth_new_payload,
)
.await?;
let fcu_latency = fcu_start.elapsed();
let total_latency =
@@ -420,7 +430,6 @@ impl Command {
TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
results.push((gas_row, combined_result));
debug!(target: "reth-bench", ?fcu_result, "Payload executed successfully");
parent_hash = block_hash;
}

View File

@@ -392,3 +392,47 @@ pub(crate) async fn call_forkchoice_updated<N, P: EngineApiValidWaitExt<N>>(
}
}
}
/// Calls either `reth_forkchoiceUpdated` or the standard `engine_forkchoiceUpdated*` depending
/// on `use_reth`.
///
/// When `use_reth` is true, uses the `reth_forkchoiceUpdated` endpoint which sends a regular FCU
/// with no payload attributes.
pub(crate) async fn call_forkchoice_updated_with_reth<
N: Network,
P: Provider<N> + EngineApiValidWaitExt<N>,
>(
provider: P,
message_version: EngineApiMessageVersion,
forkchoice_state: ForkchoiceState,
use_reth: bool,
) -> TransportResult<ForkchoiceUpdated> {
if use_reth {
let method = "reth_forkchoiceUpdated";
let reth_params = serde_json::to_value((forkchoice_state,))
.expect("ForkchoiceState serialization cannot fail");
debug!(target: "reth-bench", method, "Sending forkchoiceUpdated");
let mut resp: ForkchoiceUpdated = provider.client().request(method, &reth_params).await?;
while !resp.is_valid() {
if resp.is_invalid() {
error!(target: "reth-bench", ?resp, "Invalid {method}");
return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(
std::io::Error::other(format!("Invalid {method}: {resp:?}")),
)))
}
if resp.is_syncing() {
return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
"invalid range: no canonical state found for parent of requested block",
))
}
resp = provider.client().request(method, &reth_params).await?;
}
Ok(resp)
} else {
call_forkchoice_updated(provider, message_version, forkchoice_state, None).await
}
}

View File

@@ -30,7 +30,8 @@ workspace = true
# reth
reth-ethereum-cli.workspace = true
reth-chainspec.workspace = true
reth-primitives.workspace = true
reth-primitives-traits.workspace = true
reth-ethereum-primitives.workspace = true
reth-db = { workspace = true, features = ["mdbx"] }
reth-provider.workspace = true
reth-revm.workspace = true
@@ -110,7 +111,6 @@ dev = ["reth-ethereum-cli/dev"]
asm-keccak = [
"reth-node-core/asm-keccak",
"reth-primitives/asm-keccak",
"reth-ethereum-cli/asm-keccak",
"reth-node-ethereum/asm-keccak",
"alloy-primitives/asm-keccak",

View File

@@ -124,9 +124,11 @@ pub mod providers {
pub use reth_provider::*;
}
/// Re-exported from `reth_primitives`.
/// Re-exported primitives.
#[allow(ambiguous_glob_reexports)]
pub mod primitives {
pub use reth_primitives::*;
pub use reth_ethereum_primitives::*;
pub use reth_primitives_traits::*;
}
/// Re-exported from `reth_ethereum_consensus`.

View File

@@ -1061,14 +1061,6 @@ mod tests {
) -> ProviderResult<Option<StorageValue>> {
Ok(None)
}
fn storage_by_hashed_key(
&self,
_address: Address,
_hashed_storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
Ok(None)
}
}
impl BytecodeReader for MockStateProvider {

View File

@@ -223,26 +223,6 @@ impl<N: NodePrimitives> StateProvider for MemoryOverlayStateProviderRef<'_, N> {
self.historical.storage(address, storage_key)
}
fn storage_by_hashed_key(
&self,
address: Address,
hashed_storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
let hashed_address = keccak256(address);
let state = &self.trie_input().state;
if let Some(hs) = state.storages.get(&hashed_address) {
if let Some(value) = hs.storage.get(&hashed_storage_key) {
return Ok(Some(*value));
}
if hs.wiped {
return Ok(Some(StorageValue::ZERO));
}
}
self.historical.storage_by_hashed_key(address, hashed_storage_key)
}
}
impl<N: NodePrimitives> BytecodeReader for MemoryOverlayStateProviderRef<'_, N> {

View File

@@ -132,6 +132,6 @@ impl<H: BlockHeader> EthChainSpec for ChainSpec<H> {
}
fn final_paris_total_difficulty(&self) -> Option<U256> {
self.paris_block_and_final_difficulty.map(|(_, final_difficulty)| final_difficulty)
self.get_final_paris_total_difficulty()
}
}

View File

@@ -18,6 +18,5 @@ alloy-genesis.workspace = true
# misc
clap.workspace = true
shellexpand.workspace = true
eyre.workspace = true
serde_json.workspace = true

View File

@@ -73,7 +73,7 @@ pub trait ChainSpecParser: Clone + Send + Sync + 'static {
/// A helper to parse a [`Genesis`](alloy_genesis::Genesis) as argument or from disk.
pub fn parse_genesis(s: &str) -> eyre::Result<alloy_genesis::Genesis> {
// try to read json from path first
let raw = match fs::read_to_string(PathBuf::from(shellexpand::full(s)?.into_owned())) {
let raw = match fs::read_to_string(PathBuf::from(s)) {
Ok(raw) => raw,
Err(io_err) => {
// valid json may start with "\n", but must contain "{"

View File

@@ -53,7 +53,7 @@ reth-tasks.workspace = true
reth-storage-api.workspace = true
reth-trie = { workspace = true, features = ["metrics"] }
reth-trie-db = { workspace = true, features = ["metrics"] }
reth-trie-common.workspace = true
reth-trie-common = { workspace = true, optional = true }
reth-primitives-traits.workspace = true
reth-discv4.workspace = true
reth-discv5.workspace = true
@@ -113,6 +113,7 @@ arbitrary = [
"dep:proptest",
"dep:arbitrary",
"dep:proptest-arbitrary-interop",
"dep:reth-trie-common",
"reth-db-api/arbitrary",
"reth-eth-wire/arbitrary",
"reth-db/arbitrary",
@@ -123,11 +124,11 @@ arbitrary = [
"reth-codecs/test-utils",
"reth-prune-types/test-utils",
"reth-stages-types/test-utils",
"reth-trie-common/test-utils",
"reth-trie-common?/test-utils",
"reth-codecs/arbitrary",
"reth-prune-types/arbitrary",
"reth-stages-types?/arbitrary",
"reth-trie-common/arbitrary",
"reth-trie-common?/arbitrary",
"alloy-consensus/arbitrary",
"reth-primitives-traits/arbitrary",
"reth-ethereum-primitives/arbitrary",

View File

@@ -146,11 +146,24 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
})
}
};
let rocksdb_provider = RocksDBProvider::builder(data_dir.rocksdb())
.with_default_tables()
.with_database_log_level(self.db.log_level)
.with_read_only(!access.is_read_write())
.build()?;
let rocksdb_provider = if !access.is_read_write() && !RocksDBProvider::exists(&rocksdb_path)
{
// RocksDB database doesn't exist yet (e.g. datadir restored from a snapshot
// or created before RocksDB storage). Create an empty one so read-only
// commands can proceed.
debug!(target: "reth::cli", ?rocksdb_path, "RocksDB not found, initializing empty database");
reth_fs_util::create_dir_all(&rocksdb_path)?;
RocksDBProvider::builder(data_dir.rocksdb())
.with_default_tables()
.with_database_log_level(self.db.log_level)
.build()?
} else {
RocksDBProvider::builder(data_dir.rocksdb())
.with_default_tables()
.with_database_log_level(self.db.log_level)
.with_read_only(!access.is_read_write())
.build()?
};
let provider_factory =
self.create_provider_factory(&config, db, sfp, rocksdb_provider, access, runtime)?;

View File

@@ -14,7 +14,7 @@ use reth_db_api::{
use reth_db_common::DbTool;
use reth_node_builder::{NodeTypesWithDB, NodeTypesWithDBAdapter};
use reth_provider::{providers::ProviderNodeTypes, DBProvider, StaticFileProviderFactory};
use reth_static_file_types::StaticFileSegment;
use reth_static_file_types::{ChangesetOffset, StaticFileSegment};
use std::{
hash::{BuildHasher, Hasher},
time::{Duration, Instant},
@@ -134,12 +134,12 @@ fn checksum_static_file<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
.ok_or_else(|| eyre::eyre!("No static files found for segment: {}", segment))?;
let start_time = Instant::now();
let mut hasher = checksum_hasher();
let mut total = 0usize;
let limit = limit.unwrap_or(usize::MAX);
let mut checksummer = Checksummer::new(checksum_hasher(), limit);
let start_block = start_block.unwrap_or(0);
let end_block = end_block.unwrap_or(u64::MAX);
let is_change_based = segment.is_change_based();
info!(
"Computing checksum for {} static files, start_block={}, end_block={}, limit={:?}",
@@ -149,7 +149,8 @@ fn checksum_static_file<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
if limit == usize::MAX { None } else { Some(limit) }
);
'outer: for (block_range, _header) in ranges.iter().sorted_by_key(|(range, _)| range.start()) {
let mut reached_limit = false;
for (block_range, _header) in ranges.iter().sorted_by_key(|(range, _)| range.start()) {
if block_range.end() < start_block || block_range.start() > end_block {
continue;
}
@@ -167,28 +168,42 @@ fn checksum_static_file<N: CliNodeTypes<ChainSpec: EthereumHardforks>>(
let mut cursor = jar_provider.cursor()?;
while let Ok(Some(row)) = cursor.next_row() {
for col_data in row.iter() {
hasher.write(col_data);
}
if is_change_based {
let offsets = jar_provider.read_changeset_offsets()?.ok_or_else(|| {
eyre::eyre!(
"Missing changeset offsets sidecar for segment {} at range {}",
segment,
block_range
)
})?;
let input = ChangeBasedChecksumInput {
segment,
block_range_start: block_range.start(),
start_block,
end_block,
offsets: &offsets,
};
total += 1;
if total.is_multiple_of(PROGRESS_LOG_INTERVAL) {
info!("Hashed {total} entries.");
}
if total >= limit {
break 'outer;
reached_limit = checksum_change_based_segment(&mut checksummer, input, &mut cursor)?;
} else {
while let Some(row) = cursor.next_row()? {
if checksummer.write_row(&row) {
reached_limit = true;
break;
}
}
}
// Explicitly drop provider before removing from cache to avoid deadlock
drop(jar_provider);
static_file_provider.remove_cached_provider(segment, fixed_block_range.end());
if reached_limit {
break;
}
}
let checksum = hasher.finish();
let (checksum, total) = checksummer.finish();
let elapsed = start_time.elapsed();
info!(
@@ -267,7 +282,7 @@ impl<N: ProviderNodeTypes> TableViewer<(u64, Duration)> for ChecksumViewer<'_, N
total = index + 1;
if total >= limit {
break
break;
}
}
@@ -285,3 +300,139 @@ impl<N: ProviderNodeTypes> TableViewer<(u64, Duration)> for ChecksumViewer<'_, N
Ok((checksum, elapsed))
}
}
/// Accumulates a checksum over key-value entries, tracking count and limit.
struct Checksummer<H> {
hasher: H,
total: usize,
limit: usize,
}
impl<H: Hasher> Checksummer<H> {
fn new(hasher: H, limit: usize) -> Self {
Self { hasher, total: 0, limit }
}
/// Hash a row's columns (non-changeset segments). Returns `true` if the limit is reached.
fn write_row(&mut self, row: &[&[u8]]) -> bool {
for col in row {
self.hasher.write(col);
}
self.advance()
}
/// Hash a key + value as two separate writes, matching MDBX raw entry semantics.
/// Write boundaries matter: foldhash rotates its accumulator by `len` on each `write`.
fn write_entry(&mut self, key: &[u8], value: &[u8]) -> bool {
self.hasher.write(key);
self.hasher.write(value);
self.advance()
}
fn advance(&mut self) -> bool {
self.total += 1;
if self.total.is_multiple_of(PROGRESS_LOG_INTERVAL) {
info!("Hashed {} entries.", self.total);
}
self.total >= self.limit
}
fn finish(self) -> (u64, usize) {
(self.hasher.finish(), self.total)
}
}
/// Reconstruct MDBX `StorageChangeSets` key/value boundaries from a static-file row.
///
/// MDBX layout:
/// - key: `BlockNumberAddress` => `[8B block_number][20B address]`
/// - value: `StorageEntry` => `[32B storage_key][compact U256 value]`
///
/// Static-file row layout for `StorageBeforeTx`:
/// - `[20B address][32B storage_key][compact U256 value]`
fn split_storage_changeset_row(block_number: u64, row: &[u8]) -> eyre::Result<([u8; 28], &[u8])> {
if row.len() < 20 {
return Err(eyre::eyre!(
"Storage changeset row too short: expected at least 20 bytes, got {}",
row.len()
));
}
let mut key_buf = [0u8; 28];
key_buf[..8].copy_from_slice(&block_number.to_be_bytes());
key_buf[8..].copy_from_slice(&row[..20]);
Ok((key_buf, &row[20..]))
}
struct ChangeBasedChecksumInput<'a> {
segment: StaticFileSegment,
block_range_start: u64,
start_block: u64,
end_block: u64,
offsets: &'a [ChangesetOffset],
}
fn checksum_change_based_segment<H: Hasher>(
checksummer: &mut Checksummer<H>,
input: ChangeBasedChecksumInput<'_>,
cursor: &mut reth_db::static_file::StaticFileCursor<'_>,
) -> eyre::Result<bool> {
let ChangeBasedChecksumInput { segment, block_range_start, start_block, end_block, offsets } =
input;
let is_storage = segment.is_storage_change_sets();
let mut reached_limit = false;
for (offset_index, offset) in offsets.iter().enumerate() {
let block_number = block_range_start + offset_index as u64;
let include = block_number >= start_block && block_number <= end_block;
for _ in 0..offset.num_changes() {
let row = cursor.next_row()?.ok_or_else(|| {
eyre::eyre!(
"Unexpected EOF while checksumming {} static file at range starting {}",
segment,
block_range_start
)
})?;
if !include {
continue;
}
// Reconstruct MDBX key/value write boundaries. foldhash rotates
// its accumulator by `len` on each write(), so boundaries must
// match exactly.
let done = if is_storage {
// StorageChangeSets: MDBX key = BlockNumberAddress (28B),
// value = compact StorageEntry. Column 0 is compact
// StorageBeforeTx = [20B address][32B key][compact U256].
let col = row[0];
let (key, value) = split_storage_changeset_row(block_number, col)?;
checksummer.write_entry(&key, value)
} else {
// AccountChangeSets: MDBX key = BlockNumber (8B),
// value = compact AccountBeforeTx (= column 0).
checksummer.write_entry(&block_number.to_be_bytes(), row[0])
};
if done {
reached_limit = true;
break;
}
}
if reached_limit {
break;
}
}
if !reached_limit && cursor.next_row()?.is_some() {
return Err(eyre::eyre!(
"Changeset offsets do not cover all rows for {} at range starting {}",
segment,
block_range_start
));
}
Ok(reached_limit)
}

View File

@@ -98,7 +98,7 @@ impl Command {
)?;
if let Some(entry) = entry {
let se: reth_primitives_traits::StorageEntry = entry.into();
let se: reth_primitives_traits::StorageEntry = entry;
println!("{}", serde_json::to_string_pretty(&se)?);
} else {
error!(target: "reth::cli", "No content for the given table key.");
@@ -110,7 +110,7 @@ impl Command {
let serializable: Vec<_> = changesets
.into_iter()
.map(|(addr, entry)| {
let se: reth_primitives_traits::StorageEntry = entry.into();
let se: reth_primitives_traits::StorageEntry = entry;
(addr, se)
})
.collect();

View File

@@ -21,6 +21,7 @@ mod repair_trie;
mod settings;
mod state;
mod static_file_header;
mod static_files;
mod stats;
/// DB List TUI
mod tui;
@@ -62,6 +63,8 @@ pub enum Subcommands {
RepairTrie(repair_trie::Command),
/// Reads and displays the static file segment header
StaticFileHeader(static_file_header::Command),
/// Static file operations (split, etc.)
StaticFiles(static_files::Command),
/// Lists current and local database versions
Version,
/// Returns the full database path
@@ -185,6 +188,11 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
command.execute(&tool)?;
});
}
Subcommands::StaticFiles(command) => {
db_exec!(self.env, tool, N, AccessRights::RO, {
command.execute(&tool)?;
});
}
Subcommands::Version => {
let local_db_version = match get_db_version(&db_path) {
Ok(version) => Some(version),

View File

@@ -5,7 +5,6 @@ use reth_cli_util::parse_socket_address;
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
database::Database,
tables,
transaction::{DbTx, DbTxMut},
};
use reth_db_common::DbTool;
@@ -21,13 +20,15 @@ use reth_node_metrics::{
};
use reth_provider::{providers::ProviderNodeTypes, ChainSpecProvider, StageCheckpointReader};
use reth_stages::StageId;
use reth_storage_api::StorageSettingsCache;
use reth_tasks::TaskExecutor;
use reth_trie::{
verify::{Output, Verifier},
Nibbles,
};
use reth_trie_common::{StorageTrieEntry, StoredNibbles, StoredNibblesSubKey};
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
use reth_trie_db::{
DatabaseHashedCursorFactory, DatabaseTrieCursorFactory, StorageTrieEntryLike, TrieTableAdapter,
};
use std::{
net::SocketAddr,
time::{Duration, Instant},
@@ -116,9 +117,13 @@ fn verify_only<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()> {
let mut tx = db.tx()?;
tx.disable_long_read_transaction_safety();
reth_trie_db::with_adapter!(tool.provider_factory, |A| do_verify_only::<_, A>(&tx))
}
fn do_verify_only<TX: DbTx, A: TrieTableAdapter>(tx: &TX) -> eyre::Result<()> {
// Create the verifier
let hashed_cursor_factory = DatabaseHashedCursorFactory::new(&tx);
let trie_cursor_factory = DatabaseTrieCursorFactory::new(&tx);
let hashed_cursor_factory = DatabaseHashedCursorFactory::new(tx);
let trie_cursor_factory = DatabaseTrieCursorFactory::<_, A>::new(tx);
let verifier = Verifier::new(&trie_cursor_factory, hashed_cursor_factory)?;
let metrics = RepairTrieMetrics::new();
@@ -209,17 +214,37 @@ fn verify_and_repair<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()>
// Check that a pipeline sync isn't in progress.
verify_checkpoints(provider_rw.as_ref())?;
let inconsistent_nodes = reth_trie_db::with_adapter!(tool.provider_factory, |A| {
do_verify_and_repair::<_, A>(&mut provider_rw)?
});
if inconsistent_nodes == 0 {
info!("No inconsistencies found");
} else {
provider_rw.commit()?;
info!("Repaired {} inconsistencies and committed changes", inconsistent_nodes);
}
Ok(())
}
fn do_verify_and_repair<N: ProviderNodeTypes, A: TrieTableAdapter>(
provider_rw: &mut reth_provider::DatabaseProviderRW<N::DB, N>,
) -> eyre::Result<usize>
where
<N::DB as reth_db_api::database::Database>::TXMut: DbTxMut + DbTx,
{
// Create cursors for making modifications with
let tx = provider_rw.tx_mut();
tx.disable_long_read_transaction_safety();
let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
let mut storage_trie_cursor = tx.cursor_dup_write::<tables::StoragesTrie>()?;
let mut account_trie_cursor = tx.cursor_write::<A::AccountTrieTable>()?;
let mut storage_trie_cursor = tx.cursor_dup_write::<A::StorageTrieTable>()?;
// Create the cursor factories. These cannot accept the `&mut` tx above because they require it
// to be AsRef.
// Create the cursor factories. These cannot accept the `&mut` tx above because they
// require it to be AsRef.
let tx = provider_rw.tx_ref();
let hashed_cursor_factory = DatabaseHashedCursorFactory::new(tx);
let trie_cursor_factory = DatabaseTrieCursorFactory::new(tx);
let trie_cursor_factory = DatabaseTrieCursorFactory::<_, A>::new(tx);
// Create the verifier
let verifier = Verifier::new(&trie_cursor_factory, hashed_cursor_factory)?;
@@ -257,17 +282,17 @@ fn verify_and_repair<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()>
match output {
Output::AccountExtra(path, _node) => {
// Extra account node in trie, remove it
let nibbles = StoredNibbles(path);
if account_trie_cursor.seek_exact(nibbles)?.is_some() {
let key: A::AccountKey = path.into();
if account_trie_cursor.seek_exact(key)?.is_some() {
account_trie_cursor.delete_current()?;
}
}
Output::StorageExtra(account, path, _node) => {
// Extra storage node in trie, remove it
let nibbles = StoredNibblesSubKey(path);
let subkey: A::StorageSubKey = path.into();
if storage_trie_cursor
.seek_by_key_subkey(account, nibbles.clone())?
.filter(|e| e.nibbles == nibbles)
.seek_by_key_subkey(account, subkey.clone())?
.filter(|e| *e.nibbles() == subkey)
.is_some()
{
storage_trie_cursor.delete_current()?;
@@ -276,23 +301,23 @@ fn verify_and_repair<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()>
Output::AccountWrong { path, expected: node, .. } |
Output::AccountMissing(path, node) => {
// Wrong/missing account node value, upsert it
let nibbles = StoredNibbles(path);
account_trie_cursor.upsert(nibbles, &node)?;
let key: A::AccountKey = path.into();
account_trie_cursor.upsert(key, &node)?;
}
Output::StorageWrong { account, path, expected: node, .. } |
Output::StorageMissing(account, path, node) => {
// Wrong/missing storage node value, upsert it
// (We can't just use `upsert` method with a dup cursor, it's not properly
// supported)
let nibbles = StoredNibblesSubKey(path);
let subkey: A::StorageSubKey = path.into();
let entry = A::StorageValue::new(subkey.clone(), node);
if storage_trie_cursor
.seek_by_key_subkey(account, nibbles.clone())?
.filter(|v| v.nibbles == nibbles)
.seek_by_key_subkey(account, subkey.clone())?
.filter(|v| *v.nibbles() == subkey)
.is_some()
{
storage_trie_cursor.delete_current()?;
}
let entry = StorageTrieEntry { nibbles, node };
storage_trie_cursor.upsert(account, &entry)?;
}
Output::Progress(path) => {
@@ -304,14 +329,7 @@ fn verify_and_repair<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()>
}
}
if inconsistent_nodes == 0 {
info!("No inconsistencies found");
} else {
provider_rw.commit()?;
info!("Repaired {} inconsistencies and committed changes", inconsistent_nodes);
}
Ok(())
Ok(inconsistent_nodes as usize)
}
/// Output progress information based on the last seen account path.

View File

@@ -0,0 +1,31 @@
//! Static file related CLI commands
mod split;
pub use split::SplitCommand;
use clap::{Parser, Subcommand};
use reth_db_common::DbTool;
use reth_provider::providers::ProviderNodeTypes;
/// Static files subcommands
#[derive(Debug, Parser)]
pub struct Command {
#[command(subcommand)]
command: Subcommands,
}
#[derive(Debug, Subcommand)]
enum Subcommands {
/// Split static files into new files with different blocks-per-file setting
Split(SplitCommand),
}
impl Command {
/// Execute the static files command
pub fn execute<N: ProviderNodeTypes>(self, tool: &DbTool<N>) -> eyre::Result<()> {
match self.command {
Subcommands::Split(cmd) => cmd.execute(tool),
}
}
}

View File

@@ -0,0 +1,703 @@
use clap::Parser;
use reth_codecs::Compact;
use reth_db::{
cursor::DbCursorRO,
static_file::{
AccountChangesetMask, BlockHashMask, HeaderMask, ReceiptMask, StorageChangesetMask,
TotalDifficultyMask, TransactionMask, TransactionSenderMask,
},
tables,
transaction::DbTx,
};
use reth_db_api::models::CompactU256;
use reth_db_common::DbTool;
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::{ProviderNodeTypes, StaticFileProvider},
StaticFileProviderBuilder, StaticFileProviderFactory, StaticFileWriter,
};
use reth_static_file_types::StaticFileSegment;
use std::path::PathBuf;
use tracing::info;
/// Split static files into new files with different blocks-per-file setting
#[derive(Debug, Parser)]
pub struct SplitCommand {
/// Source static files directory.
/// If not specified, uses the datadir's static_files directory.
#[arg(long, value_name = "PATH")]
static_files_dir: Option<PathBuf>,
/// Output directory for the new static files.
/// Required unless --in-place is specified.
#[arg(long, value_name = "PATH", required_unless_present = "in_place")]
output_dir: Option<PathBuf>,
/// Number of blocks per output file
#[arg(long, value_name = "NUM")]
blocks_per_file: u64,
/// Segments to split (default: all)
#[arg(long, value_delimiter = ',')]
segments: Option<Vec<StaticFileSegment>>,
/// Start block number (default: 0)
#[arg(long)]
from_block: Option<u64>,
/// End block number (default: highest available)
#[arg(long)]
to_block: Option<u64>,
/// Print what would be done without writing
#[arg(long)]
dry_run: bool,
/// Split in-place: write to temp dir, verify, then atomically swap.
/// Original files are preserved in static_files.bak
#[arg(long, conflicts_with = "output_dir")]
in_place: bool,
/// Skip verification step when using --in-place
#[arg(long, requires = "in_place")]
skip_verify: bool,
}
impl SplitCommand {
/// Execute the split command
pub fn execute<N: ProviderNodeTypes>(self, tool: &DbTool<N>) -> eyre::Result<()>
where
N::Primitives: NodePrimitives<BlockHeader: Compact, SignedTx: Compact, Receipt: Compact>,
{
let segments = self.segments.clone().unwrap_or_else(|| StaticFileSegment::iter().collect());
// Use custom static files dir if provided, otherwise use datadir's static files
let (source_provider, source_dir) =
if let Some(ref static_files_dir) = self.static_files_dir {
let provider = StaticFileProviderBuilder::read_only(static_files_dir)
.build::<N::Primitives>()?;
let dir = static_files_dir.clone();
(provider, dir)
} else {
let provider = tool.provider_factory.static_file_provider();
let dir = provider.directory().to_path_buf();
(provider, dir)
};
// Determine output directory
let (output_dir, is_in_place) = if self.in_place {
let temp_dir = source_dir.with_file_name("static_files.tmp");
(temp_dir, true)
} else {
(self.output_dir.clone().expect("output_dir required when not in_place"), false)
};
info!(
target: "reth::cli",
output_dir = %output_dir.display(),
blocks_per_file = self.blocks_per_file,
?segments,
from_block = ?self.from_block,
to_block = ?self.to_block,
dry_run = self.dry_run,
in_place = is_in_place,
"Splitting static files"
);
if self.dry_run {
println!("Dry run mode - no files will be written");
if is_in_place {
println!("In-place mode:");
println!(" 1. Write to: {}", output_dir.display());
println!(" 2. Verify output integrity");
println!(" 3. Rename {} -> {}.bak", source_dir.display(), source_dir.display());
println!(" 4. Rename {} -> {}", output_dir.display(), source_dir.display());
}
for segment in &segments {
let min_block = source_provider.get_lowest_range_start(*segment);
let max_block = source_provider.get_highest_static_file_block(*segment);
if let (Some(min_block), Some(max_block)) = (min_block, max_block) {
let from_block = self.from_block.unwrap_or(min_block).max(min_block);
let to_block = self.to_block.unwrap_or(max_block).min(max_block);
let num_blocks = to_block.saturating_sub(from_block) + 1;
let num_files = num_blocks.div_ceil(self.blocks_per_file);
println!(
" {segment}: blocks {from_block}..={to_block} ({num_blocks} blocks) -> {num_files} files"
);
} else {
println!(" {segment}: no data available");
}
}
return Ok(());
}
// Clean up output directory if it exists
// For in-place mode: remove previous incomplete temp directory
// For regular mode: ensure we start fresh to avoid block number mismatches
if output_dir.exists() {
info!(target: "reth::cli", output_dir = %output_dir.display(), "Removing existing output directory");
reth_fs_util::remove_dir_all(&output_dir)?;
}
reth_fs_util::create_dir_all(&output_dir)?;
// Calculate segment ranges first to determine the global starting block
let mut segment_ranges = Vec::new();
for &segment in &segments {
let Some(min_block) = source_provider.get_lowest_range_start(segment) else {
continue;
};
let Some(max_block) = source_provider.get_highest_static_file_block(segment) else {
continue;
};
let from_block = self.from_block.unwrap_or(min_block).max(min_block);
let to_block = self.to_block.unwrap_or(max_block).min(max_block);
if from_block <= to_block {
segment_ranges.push((segment, from_block, to_block));
}
}
for (segment, from_block, to_block) in segment_ranges {
info!(target: "reth::cli", ?segment, from_block, to_block, "Processing segment");
// Build output provider per-segment with genesis_block_number set to this segment's
// starting block. This prevents the writer from trying to load non-existent previous
// files when segments have different starting blocks (e.g., pruned transactions).
let output_provider = StaticFileProviderBuilder::read_write(&output_dir)
.with_blocks_per_file(self.blocks_per_file)
.with_genesis_block_number(from_block)
.build::<N::Primitives>()?;
match segment {
StaticFileSegment::Headers => {
self.split_headers::<N>(
&source_provider,
&output_provider,
from_block,
to_block,
)?;
}
StaticFileSegment::Transactions => {
self.split_transactions::<N>(
tool,
&source_provider,
&output_provider,
from_block,
to_block,
)?;
}
StaticFileSegment::Receipts => {
self.split_receipts::<N>(
tool,
&source_provider,
&output_provider,
from_block,
to_block,
)?;
}
StaticFileSegment::TransactionSenders => {
self.split_transaction_senders::<N>(
tool,
&source_provider,
&output_provider,
from_block,
to_block,
)?;
}
StaticFileSegment::AccountChangeSets => {
self.split_account_changesets::<N>(
&source_provider,
&output_provider,
from_block,
to_block,
)?;
}
StaticFileSegment::StorageChangeSets => {
self.split_storage_changesets::<N>(
&source_provider,
&output_provider,
from_block,
to_block,
)?;
}
}
info!(target: "reth::cli", ?segment, "Segment complete");
// Drop the output provider to release file handles before processing next segment
drop(output_provider);
}
// In-place mode: verify and swap directories
if is_in_place {
// Verification step
if !self.skip_verify {
info!(target: "reth::cli", "Verifying output integrity");
self.verify_output::<N>(&output_dir, &segments)?;
}
// Atomic swap
let backup_dir = source_dir.with_file_name("static_files.bak");
// Remove old backup if exists
if backup_dir.exists() {
info!(target: "reth::cli", backup_dir = %backup_dir.display(), "Removing old backup");
reth_fs_util::remove_dir_all(&backup_dir)?;
}
// Drop source provider to release file handles
drop(source_provider);
// Rename: source -> backup
info!(target: "reth::cli",
from = %source_dir.display(),
to = %backup_dir.display(),
"Moving original to backup"
);
reth_fs_util::rename(&source_dir, &backup_dir)?;
// Rename: temp -> source
info!(target: "reth::cli",
from = %output_dir.display(),
to = %source_dir.display(),
"Moving new files into place"
);
reth_fs_util::rename(&output_dir, &source_dir)?;
info!(target: "reth::cli",
backup = %backup_dir.display(),
"In-place split complete. Original files preserved in backup directory"
);
}
info!(target: "reth::cli", "Static file split complete");
Ok(())
}
/// Verify the output static files have valid data
fn verify_output<N: ProviderNodeTypes>(
&self,
output_dir: &PathBuf,
segments: &[StaticFileSegment],
) -> eyre::Result<()> {
let provider = StaticFileProviderBuilder::read_only(output_dir).build::<N::Primitives>()?;
for &segment in segments {
let Some(lowest) = provider.get_lowest_range_start(segment) else {
return Err(eyre::eyre!("Verification failed: no data for segment {segment}"));
};
let Some(highest) = provider.get_highest_static_file_block(segment) else {
return Err(eyre::eyre!("Verification failed: no data for segment {segment}"));
};
// Verify we can read the first and last blocks
provider.get_segment_provider(segment, lowest)?;
provider.get_segment_provider(segment, highest)?;
info!(target: "reth::cli", ?segment, from_block = lowest, to_block = highest, "Verified");
}
Ok(())
}
fn split_headers<N: ProviderNodeTypes>(
&self,
source: &StaticFileProvider<N::Primitives>,
output: &StaticFileProvider<N::Primitives>,
from_block: u64,
to_block: u64,
) -> eyre::Result<()>
where
<N::Primitives as NodePrimitives>::BlockHeader: Compact,
{
let mut writer = output.get_writer(from_block, StaticFileSegment::Headers)?;
for block in from_block..=to_block {
let jar = source.get_segment_provider(StaticFileSegment::Headers, block)?;
let mut cursor = jar.cursor()?;
let header: <N::Primitives as NodePrimitives>::BlockHeader = cursor
.get_one::<HeaderMask<_>>(block.into())?
.ok_or_else(|| eyre::eyre!("Missing header for block {block}"))?;
let td: CompactU256 = cursor
.get_one::<TotalDifficultyMask>(block.into())?
.ok_or_else(|| eyre::eyre!("Missing TD for block {block}"))?;
let hash = cursor
.get_one::<BlockHashMask>(block.into())?
.ok_or_else(|| eyre::eyre!("Missing hash for block {block}"))?;
writer.append_header_with_td(&header, td.into(), &hash)?;
if block % 100_000 == 0 {
info!(target: "reth::cli", block, to_block, "Headers progress");
}
}
writer.commit()?;
Ok(())
}
fn split_transactions<N: ProviderNodeTypes>(
&self,
tool: &DbTool<N>,
source: &StaticFileProvider<N::Primitives>,
output: &StaticFileProvider<N::Primitives>,
from_block: u64,
to_block: u64,
) -> eyre::Result<()>
where
<N::Primitives as NodePrimitives>::SignedTx: Compact,
{
let tx = tool.provider_factory.provider()?.into_tx();
let mut indices_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
let mut writer = output.get_writer(from_block, StaticFileSegment::Transactions)?;
for block in from_block..=to_block {
writer.increment_block(block)?;
if let Some(indices) = indices_cursor.seek_exact(block)?.map(|(_, v)| v) {
let first_tx = indices.first_tx_num;
let tx_count = indices.tx_count;
for tx_num in first_tx..first_tx + tx_count {
let jar =
source.get_segment_provider(StaticFileSegment::Transactions, tx_num)?;
let mut cursor = jar.cursor()?;
let transaction: <N::Primitives as NodePrimitives>::SignedTx = cursor
.get_one::<TransactionMask<_>>(tx_num.into())?
.ok_or_else(|| eyre::eyre!("Missing transaction {tx_num}"))?;
writer.append_transaction(tx_num, &transaction)?;
}
}
if block % 100_000 == 0 {
info!(target: "reth::cli", block, to_block, "Transactions progress");
}
}
writer.commit()?;
Ok(())
}
fn split_receipts<N: ProviderNodeTypes>(
&self,
tool: &DbTool<N>,
source: &StaticFileProvider<N::Primitives>,
output: &StaticFileProvider<N::Primitives>,
from_block: u64,
to_block: u64,
) -> eyre::Result<()>
where
<N::Primitives as NodePrimitives>::Receipt: Compact,
{
let tx = tool.provider_factory.provider()?.into_tx();
let mut indices_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
let mut writer = output.get_writer(from_block, StaticFileSegment::Receipts)?;
for block in from_block..=to_block {
writer.increment_block(block)?;
if let Some(indices) = indices_cursor.seek_exact(block)?.map(|(_, v)| v) {
let first_tx = indices.first_tx_num;
let tx_count = indices.tx_count;
for tx_num in first_tx..first_tx + tx_count {
let jar = source.get_segment_provider(StaticFileSegment::Receipts, tx_num)?;
let mut cursor = jar.cursor()?;
let receipt: <N::Primitives as NodePrimitives>::Receipt = cursor
.get_one::<ReceiptMask<_>>(tx_num.into())?
.ok_or_else(|| eyre::eyre!("Missing receipt {tx_num}"))?;
writer.append_receipt(tx_num, &receipt)?;
}
}
if block % 100_000 == 0 {
info!(target: "reth::cli", block, to_block, "Receipts progress");
}
}
writer.commit()?;
Ok(())
}
fn split_transaction_senders<N: ProviderNodeTypes>(
&self,
tool: &DbTool<N>,
source: &StaticFileProvider<N::Primitives>,
output: &StaticFileProvider<N::Primitives>,
from_block: u64,
to_block: u64,
) -> eyre::Result<()> {
let tx = tool.provider_factory.provider()?.into_tx();
let mut indices_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
let mut writer = output.get_writer(from_block, StaticFileSegment::TransactionSenders)?;
for block in from_block..=to_block {
writer.increment_block(block)?;
if let Some(indices) = indices_cursor.seek_exact(block)?.map(|(_, v)| v) {
let first_tx = indices.first_tx_num;
let tx_count = indices.tx_count;
for tx_num in first_tx..first_tx + tx_count {
let jar = source
.get_segment_provider(StaticFileSegment::TransactionSenders, tx_num)?;
let mut cursor = jar.cursor()?;
let sender = cursor
.get_one::<TransactionSenderMask>(tx_num.into())?
.ok_or_else(|| eyre::eyre!("Missing sender {tx_num}"))?;
writer.append_transaction_sender(tx_num, &sender)?;
}
}
if block % 100_000 == 0 {
info!(target: "reth::cli", block, to_block, "Transaction senders progress");
}
}
writer.commit()?;
Ok(())
}
fn split_account_changesets<N: ProviderNodeTypes>(
&self,
source: &StaticFileProvider<N::Primitives>,
output: &StaticFileProvider<N::Primitives>,
from_block: u64,
to_block: u64,
) -> eyre::Result<()> {
let mut writer = output.get_writer(from_block, StaticFileSegment::AccountChangeSets)?;
for block in from_block..=to_block {
let jar = source.get_segment_provider(StaticFileSegment::AccountChangeSets, block)?;
let mut changes = Vec::new();
if let Some(offset) = jar.read_changeset_offset(block)? {
let mut cursor = jar.cursor()?;
for i in offset.changeset_range() {
if let Some(change) = cursor.get_one::<AccountChangesetMask>(i.into())? {
changes.push(change);
}
}
}
writer.append_account_changeset(changes, block)?;
if block % 100_000 == 0 {
info!(target: "reth::cli", block, to_block, "Account changesets progress");
}
}
writer.commit()?;
Ok(())
}
fn split_storage_changesets<N: ProviderNodeTypes>(
&self,
source: &StaticFileProvider<N::Primitives>,
output: &StaticFileProvider<N::Primitives>,
from_block: u64,
to_block: u64,
) -> eyre::Result<()> {
let mut writer = output.get_writer(from_block, StaticFileSegment::StorageChangeSets)?;
for block in from_block..=to_block {
let jar = source.get_segment_provider(StaticFileSegment::StorageChangeSets, block)?;
let mut changes = Vec::new();
if let Some(offset) = jar.read_changeset_offset(block)? {
let mut cursor = jar.cursor()?;
for i in offset.changeset_range() {
if let Some(change) = cursor.get_one::<StorageChangesetMask>(i.into())? {
changes.push(change);
}
}
}
writer.append_storage_changeset(changes, block)?;
if block % 100_000 == 0 {
info!(target: "reth::cli", block, to_block, "Storage changesets progress");
}
}
writer.commit()?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use clap::Parser;
#[derive(Parser)]
struct TestCli {
#[command(subcommand)]
command: TestCommand,
}
#[derive(clap::Subcommand)]
enum TestCommand {
Split(SplitCommand),
}
#[test]
fn parse_split_command_minimal() {
let args = TestCli::try_parse_from([
"test",
"split",
"--output-dir",
"/tmp/output",
"--blocks-per-file",
"100000",
])
.unwrap();
match args.command {
TestCommand::Split(cmd) => {
assert_eq!(cmd.output_dir, Some(PathBuf::from("/tmp/output")));
assert_eq!(cmd.blocks_per_file, 100000);
assert!(cmd.segments.is_none());
assert!(cmd.from_block.is_none());
assert!(cmd.to_block.is_none());
assert!(!cmd.dry_run);
assert!(!cmd.in_place);
}
}
}
#[test]
fn parse_split_command_full() {
let args = TestCli::try_parse_from([
"test",
"split",
"--output-dir",
"/tmp/output",
"--blocks-per-file",
"50000",
"--segments",
"headers,receipts",
"--from-block",
"1000",
"--to-block",
"500000",
"--dry-run",
])
.unwrap();
match args.command {
TestCommand::Split(cmd) => {
assert_eq!(cmd.output_dir, Some(PathBuf::from("/tmp/output")));
assert_eq!(cmd.blocks_per_file, 50000);
assert_eq!(
cmd.segments,
Some(vec![StaticFileSegment::Headers, StaticFileSegment::Receipts])
);
assert_eq!(cmd.from_block, Some(1000));
assert_eq!(cmd.to_block, Some(500000));
assert!(cmd.dry_run);
assert!(!cmd.in_place);
}
}
}
#[test]
fn parse_split_command_in_place() {
let args =
TestCli::try_parse_from(["test", "split", "--in-place", "--blocks-per-file", "100000"])
.unwrap();
match args.command {
TestCommand::Split(cmd) => {
assert!(cmd.output_dir.is_none());
assert_eq!(cmd.blocks_per_file, 100000);
assert!(cmd.in_place);
assert!(!cmd.skip_verify);
}
}
}
#[test]
fn parse_split_command_in_place_skip_verify() {
let args = TestCli::try_parse_from([
"test",
"split",
"--in-place",
"--skip-verify",
"--blocks-per-file",
"100000",
])
.unwrap();
match args.command {
TestCommand::Split(cmd) => {
assert!(cmd.in_place);
assert!(cmd.skip_verify);
}
}
}
#[test]
fn parse_split_command_output_dir_conflicts_with_in_place() {
let result = TestCli::try_parse_from([
"test",
"split",
"--output-dir",
"/tmp/out",
"--in-place",
"--blocks-per-file",
"100000",
]);
assert!(result.is_err());
}
#[test]
fn parse_split_command_skip_verify_requires_in_place() {
// --skip-verify without --in-place should fail
let result = TestCli::try_parse_from([
"test",
"split",
"--skip-verify",
"--blocks-per-file",
"100000",
]);
assert!(result.is_err(), "--skip-verify should require --in-place");
}
#[test]
fn parse_split_command_all_segments() {
let args = TestCli::try_parse_from([
"test",
"split",
"--output-dir",
"/tmp/out",
"--blocks-per-file",
"10",
"--segments",
"headers,transactions,receipts,transaction-senders,account-change-sets,storage-change-sets",
])
.unwrap();
match args.command {
TestCommand::Split(cmd) => {
let segments = cmd.segments.unwrap();
assert_eq!(segments.len(), 6);
assert!(segments.contains(&StaticFileSegment::Headers));
assert!(segments.contains(&StaticFileSegment::Transactions));
assert!(segments.contains(&StaticFileSegment::Receipts));
assert!(segments.contains(&StaticFileSegment::TransactionSenders));
assert!(segments.contains(&StaticFileSegment::AccountChangeSets));
assert!(segments.contains(&StaticFileSegment::StorageChangeSets));
}
}
}
}

View File

@@ -20,7 +20,10 @@ use reth_provider::{
use reth_revm::database::StateProviderDatabase;
use reth_stages::stages::calculate_gas_used_from_headers;
use std::{
sync::Arc,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::{Duration, Instant},
};
use tokio::{sync::mpsc, task::JoinSet};
@@ -46,6 +49,10 @@ pub struct Command<C: ChainSpecParser> {
#[arg(long)]
num_tasks: Option<u64>,
/// Number of blocks each worker processes before grabbing the next chunk.
#[arg(long, default_value = "5000")]
blocks_per_chunk: u64,
/// Continues with execution when an invalid block is encountered and collects these blocks.
#[arg(long)]
skip_invalid_blocks: bool,
@@ -92,12 +99,10 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
std::thread::available_parallelism().map(|n| n.get() as u64).unwrap_or(10)
});
let total_blocks = max_block - min_block;
let total_gas = calculate_gas_used_from_headers(
&provider_factory.static_file_provider(),
min_block..=max_block,
)?;
let blocks_per_task = total_blocks / num_tasks;
let db_at = {
let provider_factory = provider_factory.clone();
@@ -109,18 +114,17 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
};
let skip_invalid_blocks = self.skip_invalid_blocks;
let blocks_per_chunk = self.blocks_per_chunk;
let (stats_tx, mut stats_rx) = mpsc::unbounded_channel();
let (info_tx, mut info_rx) = mpsc::unbounded_channel();
let cancellation = CancellationToken::new();
let _guard = cancellation.drop_guard();
let mut tasks = JoinSet::new();
for i in 0..num_tasks {
let start_block = min_block + i * blocks_per_task;
let end_block =
if i == num_tasks - 1 { max_block } else { start_block + blocks_per_task };
// Shared counter for work stealing: workers atomically grab the next chunk of blocks.
let next_block = Arc::new(AtomicU64::new(min_block));
// Spawn thread executing blocks
let mut tasks = JoinSet::new();
for _ in 0..num_tasks {
let provider_factory = provider_factory.clone();
let evm_config = components.evm_config().clone();
let consensus = components.consensus().clone();
@@ -128,95 +132,122 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
let stats_tx = stats_tx.clone();
let info_tx = info_tx.clone();
let cancellation = cancellation.clone();
let next_block = Arc::clone(&next_block);
tasks.spawn_blocking(move || {
let mut executor = evm_config.batch_executor(db_at(start_block - 1));
let mut executor_created = Instant::now();
let executor_lifetime = Duration::from_secs(120);
'blocks: for block in start_block..end_block {
loop {
if cancellation.is_cancelled() {
// exit if the program is being terminated
break
break;
}
let block = provider_factory
.recovered_block(block.into(), TransactionVariant::NoHash)?
.unwrap();
// Atomically grab the next chunk of blocks.
let chunk_start =
next_block.fetch_add(blocks_per_chunk, Ordering::Relaxed);
if chunk_start >= max_block {
break;
}
let chunk_end = (chunk_start + blocks_per_chunk).min(max_block);
let result = match executor.execute_one(&block) {
Ok(result) => result,
Err(err) => {
if skip_invalid_blocks {
executor = evm_config.batch_executor(db_at(block.number()));
let _ = info_tx.send((block, eyre::Report::new(err)));
continue
}
return Err(err.into())
let mut executor = evm_config.batch_executor(db_at(chunk_start - 1));
let mut executor_created = Instant::now();
'blocks: for block in chunk_start..chunk_end {
if cancellation.is_cancelled() {
break;
}
};
if let Err(err) = consensus
.validate_block_post_execution(&block, &result, None)
.wrap_err_with(|| {
format!("Failed to validate block {} {}", block.number(), block.hash())
})
{
let correct_receipts =
provider_factory.receipts_by_block(block.number().into())?.unwrap();
let block = provider_factory
.recovered_block(block.into(), TransactionVariant::NoHash)?
.unwrap();
for (i, (receipt, correct_receipt)) in
result.receipts.iter().zip(correct_receipts.iter()).enumerate()
{
if receipt != correct_receipt {
let tx_hash = block.body().transactions()[i].tx_hash();
error!(
?receipt,
?correct_receipt,
index = i,
?tx_hash,
"Invalid receipt"
);
let expected_gas_used = correct_receipt.cumulative_gas_used() -
if i == 0 {
0
} else {
correct_receipts[i - 1].cumulative_gas_used()
};
let got_gas_used = receipt.cumulative_gas_used() -
if i == 0 {
0
} else {
result.receipts[i - 1].cumulative_gas_used()
};
if got_gas_used != expected_gas_used {
let mismatch = GotExpected {
expected: expected_gas_used,
got: got_gas_used,
};
error!(number=?block.number(), ?mismatch, "Gas usage mismatch");
if skip_invalid_blocks {
executor = evm_config.batch_executor(db_at(block.number()));
let _ = info_tx.send((block, err));
continue 'blocks;
}
return Err(err);
let result = match executor.execute_one(&block) {
Ok(result) => result,
Err(err) => {
if skip_invalid_blocks {
executor =
evm_config.batch_executor(db_at(block.number()));
let _ =
info_tx.send((block, eyre::Report::new(err)));
continue
}
} else {
continue;
return Err(err.into())
}
};
if let Err(err) = consensus
.validate_block_post_execution(&block, &result, None)
.wrap_err_with(|| {
format!(
"Failed to validate block {} {}",
block.number(),
block.hash()
)
})
{
let correct_receipts = provider_factory
.receipts_by_block(block.number().into())?
.unwrap();
for (i, (receipt, correct_receipt)) in
result.receipts.iter().zip(correct_receipts.iter()).enumerate()
{
if receipt != correct_receipt {
let tx_hash =
block.body().transactions()[i].tx_hash();
error!(
?receipt,
?correct_receipt,
index = i,
?tx_hash,
"Invalid receipt"
);
let expected_gas_used =
correct_receipt.cumulative_gas_used() -
if i == 0 {
0
} else {
correct_receipts[i - 1]
.cumulative_gas_used()
};
let got_gas_used = receipt.cumulative_gas_used() -
if i == 0 {
0
} else {
result.receipts[i - 1].cumulative_gas_used()
};
if got_gas_used != expected_gas_used {
let mismatch = GotExpected {
expected: expected_gas_used,
got: got_gas_used,
};
error!(number=?block.number(), ?mismatch, "Gas usage mismatch");
if skip_invalid_blocks {
executor = evm_config
.batch_executor(db_at(block.number()));
let _ = info_tx.send((block, err));
continue 'blocks;
}
return Err(err);
}
} else {
continue;
}
}
return Err(err);
}
let _ = stats_tx.send(block.gas_used());
return Err(err);
}
let _ = stats_tx.send(block.gas_used());
// Reset DB once in a while to avoid OOM or read tx timeouts
if executor.size_hint() > 1_000_000 ||
executor_created.elapsed() > executor_lifetime
{
executor = evm_config.batch_executor(db_at(block.number()));
executor_created = Instant::now();
// Reset DB once in a while to avoid OOM or read tx timeouts
if executor.size_hint() > 1_000_000 ||
executor_created.elapsed() > executor_lifetime
{
executor =
evm_config.batch_executor(db_at(block.number()));
executor_created = Instant::now();
}
}
}

View File

@@ -52,7 +52,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
Comp: CliNodeComponents<N>,
F: FnOnce(Arc<C::ChainSpec>) -> Comp,
{
let Environment { provider_factory, config, .. } =
let Environment { provider_factory, config, data_dir: _ } =
self.env.init::<N>(AccessRights::RW, runtime)?;
let target = self.command.unwind_target(provider_factory.clone())?;

View File

@@ -19,6 +19,7 @@ reth-consensus.workspace = true
reth-primitives-traits.workspace = true
alloy-consensus.workspace = true
alloy-eips.workspace = true
alloy-primitives.workspace = true
[dev-dependencies]
alloy-primitives = { workspace = true, features = ["rand"] }

View File

@@ -2,6 +2,7 @@
use alloy_consensus::{BlockHeader as _, EMPTY_OMMER_ROOT_HASH};
use alloy_eips::{eip4844::DATA_GAS_PER_BLOB, eip7840::BlobParams};
use alloy_primitives::B256;
use reth_chainspec::{EthChainSpec, EthereumHardfork, EthereumHardforks};
use reth_consensus::ConsensusError;
use reth_primitives_traits::{
@@ -141,6 +142,27 @@ pub fn validate_block_pre_execution<B, ChainSpec>(
block: &SealedBlock<B>,
chain_spec: &ChainSpec,
) -> Result<(), ConsensusError>
where
B: Block,
ChainSpec: EthChainSpec + EthereumHardforks,
{
validate_block_pre_execution_with_tx_root(block, chain_spec, None)
}
/// Validate a block without regard for state using an optional pre-computed transaction root.
///
/// - Compares the ommer hash in the block header to the block body
/// - Compares the transactions root in the block header to the block body
/// - Pre-execution transaction validation
///
/// If `transaction_root` is provided, it is used instead of recomputing the transaction trie
/// root from the block body. The caller must ensure this value was derived from
/// `block.body().calculate_tx_root()`.
pub fn validate_block_pre_execution_with_tx_root<B, ChainSpec>(
block: &SealedBlock<B>,
chain_spec: &ChainSpec,
transaction_root: Option<B256>,
) -> Result<(), ConsensusError>
where
B: Block,
ChainSpec: EthChainSpec + EthereumHardforks,
@@ -148,8 +170,14 @@ where
post_merge_hardfork_fields(block, chain_spec)?;
// Check transaction root
if let Err(error) = block.ensure_transaction_root_valid() {
return Err(ConsensusError::BodyTransactionRootDiff(error.into()))
let expected_transaction_root = block.header().transactions_root();
let calculated_transaction_root =
transaction_root.unwrap_or_else(|| block.body().calculate_tx_root());
if calculated_transaction_root != expected_transaction_root {
return Err(ConsensusError::BodyTransactionRootDiff(
GotExpected { got: calculated_transaction_root, expected: expected_transaction_root }
.into(),
))
}
Ok(())
@@ -426,7 +454,7 @@ pub fn validate_against_parent_4844<H: BlockHeader>(
mod tests {
use super::*;
use alloy_consensus::{BlockBody, Header, TxEip4844};
use alloy_eips::eip4895::Withdrawals;
use alloy_eips::{eip4844::DATA_GAS_PER_BLOB, eip4895::Withdrawals};
use alloy_primitives::{Address, Bytes, Signature, U256};
use rand::Rng;
use reth_chainspec::ChainSpecBuilder;
@@ -507,4 +535,66 @@ mod tests {
// Test with custom larger limit - should pass
assert!(validate_header_extra_data(&header_33, 64).is_ok());
}
#[test]
fn precomputed_tx_root_correct_passes() {
let chain_spec = ChainSpecBuilder::mainnet().cancun_activated().build();
let transaction = mock_blob_tx(1, 1);
let tx_root = proofs::calculate_transaction_root(std::slice::from_ref(&transaction));
let header = Header {
base_fee_per_gas: Some(1337),
withdrawals_root: Some(proofs::calculate_withdrawals_root(&[])),
transactions_root: tx_root,
blob_gas_used: Some(DATA_GAS_PER_BLOB),
excess_blob_gas: Some(0),
..Default::default()
};
let body = BlockBody {
transactions: vec![transaction],
ommers: vec![],
withdrawals: Some(Withdrawals::default()),
};
let block = SealedBlock::seal_slow(alloy_consensus::Block { header, body });
// Some(correct_root) should pass just like None
assert!(
validate_block_pre_execution_with_tx_root(&block, &chain_spec, Some(tx_root)).is_ok()
);
assert!(validate_block_pre_execution_with_tx_root(&block, &chain_spec, None).is_ok());
}
#[test]
fn precomputed_tx_root_wrong_fails() {
let chain_spec = ChainSpecBuilder::mainnet().cancun_activated().build();
let transaction = mock_blob_tx(1, 1);
let tx_root = proofs::calculate_transaction_root(std::slice::from_ref(&transaction));
let header = Header {
base_fee_per_gas: Some(1337),
withdrawals_root: Some(proofs::calculate_withdrawals_root(&[])),
transactions_root: tx_root,
blob_gas_used: Some(DATA_GAS_PER_BLOB),
excess_blob_gas: Some(0),
..Default::default()
};
let body = BlockBody {
transactions: vec![transaction],
ommers: vec![],
withdrawals: Some(Withdrawals::default()),
};
let block = SealedBlock::seal_slow(alloy_consensus::Block { header, body });
let wrong_root = B256::repeat_byte(0xff);
assert!(matches!(
validate_block_pre_execution_with_tx_root(&block, &chain_spec, Some(wrong_root))
.unwrap_err(),
ConsensusError::BodyTransactionRootDiff(diff)
if diff.0.got == wrong_root && diff.0.expected == tx_root
));
}
}

View File

@@ -21,6 +21,12 @@ use core::error::Error;
/// When provided to [`FullConsensus::validate_block_post_execution`], this allows skipping
/// the receipt root computation and using the pre-computed values instead.
pub type ReceiptRootBloom = (B256, Bloom);
/// Pre-computed transaction root.
///
/// When provided to [`Consensus::validate_block_pre_execution_with_tx_root`], this allows
/// skipping transaction trie reconstruction from the block body.
pub type TransactionRoot = B256;
use reth_execution_types::BlockExecutionResult;
use reth_primitives_traits::{
constants::{GAS_LIMIT_BOUND_DIVISOR, MAXIMUM_GAS_LIMIT_BLOCK, MINIMUM_GAS_LIMIT},
@@ -78,6 +84,22 @@ pub trait Consensus<B: Block>: HeaderValidator<B::Header> {
///
/// Note: validating blocks does not include other validations of the Consensus
fn validate_block_pre_execution(&self, block: &SealedBlock<B>) -> Result<(), ConsensusError>;
/// Validate a block disregarding world state using an optional pre-computed transaction root.
///
/// If `transaction_root` is provided, the implementation should use the pre-computed
/// transaction root instead of recomputing it from the block body. The value must have been
/// derived from `block.body().calculate_tx_root()`.
///
/// By default this falls back to [`Self::validate_block_pre_execution`].
fn validate_block_pre_execution_with_tx_root(
&self,
block: &SealedBlock<B>,
transaction_root: Option<TransactionRoot>,
) -> Result<(), ConsensusError> {
let _ = transaction_root;
self.validate_block_pre_execution(block)
}
}
/// `HeaderValidator` is a protocol that validates headers and their relationships.

View File

@@ -38,7 +38,6 @@ reth-ethereum-primitives.workspace = true
reth-cli-commands.workspace = true
reth-config.workspace = true
reth-consensus.workspace = true
reth-primitives.workspace = true
reth-db-common.workspace = true
reth-primitives-traits.workspace = true

View File

@@ -275,8 +275,9 @@ mod tests {
use crate::test_rlp_utils::{create_fcu_json, generate_test_blocks, write_blocks_to_rlp};
use reth_chainspec::{ChainSpecBuilder, MAINNET};
use reth_db::mdbx::DatabaseArguments;
use reth_ethereum_primitives::Block;
use reth_payload_builder::EthPayloadBuilderAttributes;
use reth_primitives::SealedBlock;
use reth_primitives_traits::SealedBlock;
use reth_provider::{
test_utils::MockNodeTypesWithDB, BlockHashReader, BlockNumReader, BlockReaderIdExt,
};
@@ -448,7 +449,7 @@ mod tests {
chain_spec: &ChainSpec,
block_count: u64,
temp_dir: &Path,
) -> (Vec<SealedBlock>, PathBuf) {
) -> (Vec<SealedBlock<Block>>, PathBuf) {
let test_blocks = generate_test_blocks(chain_spec, block_count);
assert_eq!(
test_blocks.len(),

View File

@@ -6,14 +6,13 @@ use alloy_primitives::{Address, B256, B64, U256};
use alloy_rlp::Encodable;
use reth_chainspec::{ChainSpec, EthereumHardforks};
use reth_ethereum_primitives::{Block, BlockBody};
use reth_primitives::SealedBlock;
use reth_primitives_traits::Block as BlockTrait;
use reth_primitives_traits::{Block as BlockTrait, SealedBlock};
use std::{io::Write, path::Path};
use tracing::debug;
/// Generate test blocks for a given chain spec
pub fn generate_test_blocks(chain_spec: &ChainSpec, count: u64) -> Vec<SealedBlock> {
let mut blocks: Vec<SealedBlock> = Vec::new();
pub fn generate_test_blocks(chain_spec: &ChainSpec, count: u64) -> Vec<SealedBlock<Block>> {
let mut blocks: Vec<SealedBlock<Block>> = Vec::new();
let genesis_header = chain_spec.sealed_genesis_header();
let mut parent_hash = genesis_header.hash();
let mut parent_number = genesis_header.number();
@@ -139,7 +138,7 @@ pub fn generate_test_blocks(chain_spec: &ChainSpec, count: u64) -> Vec<SealedBlo
}
/// Write blocks to RLP file
pub fn write_blocks_to_rlp(blocks: &[SealedBlock], path: &Path) -> std::io::Result<()> {
pub fn write_blocks_to_rlp(blocks: &[SealedBlock<Block>], path: &Path) -> std::io::Result<()> {
let mut file = std::fs::File::create(path)?;
let mut total_bytes = 0;
@@ -173,7 +172,7 @@ pub fn write_blocks_to_rlp(blocks: &[SealedBlock], path: &Path) -> std::io::Resu
}
/// Create FCU JSON for the tip of the chain
pub fn create_fcu_json(tip: &SealedBlock) -> serde_json::Value {
pub fn create_fcu_json(tip: &SealedBlock<Block>) -> serde_json::Value {
serde_json::json!({
"params": [{
"headBlockHash": format!("0x{:x}", tip.hash()),

View File

@@ -30,11 +30,7 @@ fn default_account_worker_count() -> usize {
}
/// The size of proof targets chunk to spawn in one multiproof calculation.
pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE: usize = 60;
/// The size of proof targets chunk optimized for small blocks (≤20M gas used).
/// Benchmarks: <https://gist.github.com/yongkangc/fda9c24846f0ba891376bcf81b002008>
pub const SMALL_BLOCK_MULTIPROOF_CHUNK_SIZE: usize = 30;
pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE: usize = 5;
/// Gas threshold below which the small block chunk size is used.
pub const SMALL_BLOCK_GAS_THRESHOLD: u64 = 20_000_000;
@@ -127,8 +123,6 @@ pub struct TreeConfig {
cross_block_cache_size: usize,
/// Whether the host has enough parallelism to run state root task.
has_enough_parallelism: bool,
/// Whether multiproof task should chunk proof targets.
multiproof_chunking_enabled: bool,
/// Multiproof task chunk size for proof targets.
multiproof_chunk_size: usize,
/// Number of reserved CPU cores for non-reth processes
@@ -187,7 +181,6 @@ impl Default for TreeConfig {
state_provider_metrics: false,
cross_block_cache_size: DEFAULT_CROSS_BLOCK_CACHE_SIZE,
has_enough_parallelism: has_enough_parallelism(),
multiproof_chunking_enabled: true,
multiproof_chunk_size: DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE,
reserved_cpu_cores: DEFAULT_RESERVED_CPU_CORES,
precompile_cache_disabled: false,
@@ -221,7 +214,6 @@ impl TreeConfig {
state_provider_metrics: bool,
cross_block_cache_size: usize,
has_enough_parallelism: bool,
multiproof_chunking_enabled: bool,
multiproof_chunk_size: usize,
reserved_cpu_cores: usize,
precompile_cache_disabled: bool,
@@ -248,7 +240,6 @@ impl TreeConfig {
state_provider_metrics,
cross_block_cache_size,
has_enough_parallelism,
multiproof_chunking_enabled,
multiproof_chunk_size,
reserved_cpu_cores,
precompile_cache_disabled,
@@ -290,11 +281,6 @@ impl TreeConfig {
self.max_execute_block_batch_size
}
/// Return whether the multiproof task chunking is enabled.
pub const fn multiproof_chunking_enabled(&self) -> bool {
self.multiproof_chunking_enabled
}
/// Return the multiproof task chunk size.
pub const fn multiproof_chunk_size(&self) -> usize {
self.multiproof_chunk_size
@@ -458,15 +444,6 @@ impl TreeConfig {
self
}
/// Setter for whether multiproof task should chunk proof targets.
pub const fn with_multiproof_chunking_enabled(
mut self,
multiproof_chunking_enabled: bool,
) -> Self {
self.multiproof_chunking_enabled = multiproof_chunking_enabled;
self
}
/// Setter for multiproof task chunk size for proof targets.
pub const fn with_multiproof_chunk_size(mut self, multiproof_chunk_size: usize) -> Self {
self.multiproof_chunk_size = multiproof_chunk_size;

View File

@@ -12,7 +12,8 @@ use rand::Rng;
use reth_chainspec::ChainSpec;
use reth_db_common::init::init_genesis;
use reth_engine_tree::tree::{
precompile_cache::PrecompileCacheMap, PayloadProcessor, StateProviderBuilder, TreeConfig,
precompile_cache::PrecompileCacheMap, ExecutionEnv, PayloadProcessor, StateProviderBuilder,
TreeConfig,
};
use reth_ethereum_primitives::TransactionSigned;
use reth_evm::OnStateHook;
@@ -230,7 +231,7 @@ fn bench_state_root(c: &mut Criterion) {
|(genesis_hash, mut payload_processor, provider, state_updates)| {
black_box({
let mut handle = payload_processor.spawn(
Default::default(),
ExecutionEnv::test_default(),
(
Vec::<
Result<

View File

@@ -351,14 +351,6 @@ impl<S: StateProvider, const PREWARM: bool> StateProvider for CachedStateProvide
self.state_provider.storage(account, storage_key)
}
}
fn storage_by_hashed_key(
&self,
address: Address,
hashed_storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
self.state_provider.storage_by_hashed_key(address, hashed_storage_key)
}
}
impl<S: BytecodeReader, const PREWARM: bool> BytecodeReader for CachedStateProvider<S, PREWARM> {

View File

@@ -199,17 +199,6 @@ impl<S: StateProvider> StateProvider for InstrumentedStateProvider<S> {
self.record_storage_fetch(start.elapsed());
res
}
fn storage_by_hashed_key(
&self,
address: Address,
hashed_storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
let start = Instant::now();
let res = self.state_provider.storage_by_hashed_key(address, hashed_storage_key);
self.record_storage_fetch(start.elapsed());
res
}
}
impl<S: BytecodeReader> BytecodeReader for InstrumentedStateProvider<S> {

View File

@@ -38,7 +38,7 @@ use reth_provider::{
};
use reth_revm::database::StateProviderDatabase;
use reth_stages_api::ControlFlow;
use reth_tasks::spawn_os_thread;
use reth_tasks::{spawn_os_thread, utils::increase_thread_priority};
use reth_trie_db::ChangesetCache;
use revm::interpreter::debug_unreachable;
use state::TreeState;
@@ -420,7 +420,10 @@ where
use_hashed_state,
);
let incoming = task.incoming_tx.clone();
spawn_os_thread("engine", || task.run());
spawn_os_thread("engine", || {
increase_thread_priority();
task.run()
});
(incoming, outgoing)
}
@@ -1413,7 +1416,7 @@ where
// Spawn a background task to trigger computation so it's ready when the next payload
// arrives.
if let Some(overlay) = self.state.tree_state.prepare_canonical_overlay() {
rayon::spawn(move || {
tokio::task::spawn_blocking(move || {
let _ = overlay.get();
});
}

View File

@@ -20,7 +20,6 @@ use multiproof::*;
use parking_lot::RwLock;
use prewarm::PrewarmMetrics;
use rayon::prelude::*;
use reth_engine_primitives::{SMALL_BLOCK_GAS_THRESHOLD, SMALL_BLOCK_MULTIPROOF_CHUNK_SIZE};
use reth_evm::{
block::ExecutableTxParts,
execute::{ExecutableTxFor, WithTxEnv},
@@ -33,7 +32,7 @@ use reth_provider::{
BlockExecutionOutput, BlockReader, DatabaseProviderROFactory, StateProviderFactory, StateReader,
};
use reth_revm::{db::BundleState, state::EvmState};
use reth_tasks::{ForEachOrdered, Runtime};
use reth_tasks::{utils::increase_thread_priority, ForEachOrdered, Runtime};
use reth_trie::{hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory};
use reth_trie_parallel::{
proof_task::{ProofTaskCtx, ProofWorkerHandle},
@@ -287,7 +286,7 @@ where
let parent_state_root = env.parent_state_root;
let transaction_count = env.transaction_count;
let chunk_size = Self::adaptive_chunk_size(config, env.gas_used);
let chunk_size = config.multiproof_chunk_size();
let prewarm_handle = self.spawn_caching_with(
env,
prewarm_rx,
@@ -369,24 +368,6 @@ where
/// waiting for rayon scheduling.
const PARALLEL_PREFETCH_COUNT: usize = 4;
/// Returns the multiproof chunk size adapted to the block's gas usage.
///
/// For blocks with ≤20M gas used, a smaller chunk size (30) yields better throughput.
/// For larger blocks, the configured default chunk size is used.
const fn adaptive_chunk_size(config: &TreeConfig, gas_used: u64) -> Option<usize> {
if !config.multiproof_chunking_enabled() {
return None;
}
let size = if gas_used > 0 && gas_used <= SMALL_BLOCK_GAS_THRESHOLD {
SMALL_BLOCK_MULTIPROOF_CHUNK_SIZE
} else {
config.multiproof_chunk_size()
};
Some(size)
}
/// Spawns a task advancing transaction env iterator and streaming updates through a channel.
///
/// For blocks with fewer than [`Self::SMALL_BLOCK_TX_THRESHOLD`] transactions, uses
@@ -416,9 +397,7 @@ where
transaction_count,
"using sequential sig recovery for small block"
);
self.executor.spawn_blocking(move || {
let _enter =
debug_span!(target: "engine::tree::payload_processor", "tx_iterator").entered();
self.executor.spawn_blocking_named("tx-iterator", move || {
let (transactions, convert) = transactions.into_parts();
convert_serial(transactions.into_iter(), &convert, &prewarm_tx, &execute_tx);
});
@@ -430,9 +409,7 @@ where
// few transactions are recovered sequentially and sent immediately before
// entering the parallel iterator for the remainder.
let prefetch = Self::PARALLEL_PREFETCH_COUNT.min(transaction_count);
self.executor.spawn_blocking(move || {
let _enter =
debug_span!(target: "engine::tree::payload_processor", "tx_iterator").entered();
self.executor.spawn_blocking_named("tx-iterator", move || {
let (transactions, convert) = transactions.into_parts();
let mut all: Vec<_> = transactions.into_iter().collect();
let rest = all.split_off(prefetch.min(all.len()));
@@ -447,15 +424,17 @@ where
.map(|(i, tx)| {
let idx = i + prefetch;
let tx = convert.convert(tx);
tx.map(|tx| {
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
let tx = WithTxEnv { tx_env, tx: Arc::new(tx) };
let _ = prewarm_tx.send((idx, tx.clone()));
tx
})
});
(idx, tx)
})
.for_each_ordered(|tx| {
.for_each_ordered(|(idx, tx)| {
let _ = execute_tx.send(tx);
debug!(target: "engine::tree::payload_processor", idx, "yielded transaction");
});
});
}
@@ -551,7 +530,7 @@ where
state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
from_multi_proof: CrossbeamReceiver<MultiProofMessage>,
parent_state_root: B256,
chunk_size: Option<usize>,
chunk_size: usize,
) {
let preserved_sparse_trie = self.sparse_state_trie.clone();
let trie_metrics = self.trie_metrics.clone();
@@ -562,6 +541,8 @@ where
let parent_span = Span::current();
self.executor.spawn_blocking_named("sparse-trie", move || {
increase_thread_priority();
let _enter = debug_span!(target: "engine::tree::payload_processor", parent: parent_span, "sparse_trie_task")
.entered();
@@ -603,8 +584,6 @@ where
);
let result = task.run();
// Capture the computed state_root before sending the result
let computed_state_root = result.as_ref().ok().map(|outcome| outcome.state_root);
// Acquire the guard before sending the result to prevent a race condition:
// Without this, the next block could start after send() but before store(),
@@ -613,6 +592,7 @@ where
// block's take() blocks until we've stored the trie for reuse.
let mut guard = preserved_sparse_trie.lock();
let task_result = result.as_ref().ok().cloned();
// Send state root computation result - next block may start but will block on take()
if state_root_tx.send(result).is_err() {
// Receiver dropped - payload was likely invalid or cancelled.
@@ -636,7 +616,7 @@ where
// A failed computation may have left the trie in a partially updated state.
let _enter =
debug_span!(target: "engine::tree::payload_processor", "preserve").entered();
let deferred = if let Some(state_root) = computed_state_root {
let deferred = if let Some(result) = task_result {
let start = Instant::now();
let (trie, deferred) = task.into_trie_for_reuse(
prune_depth,
@@ -644,11 +624,12 @@ where
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
disable_cache_pruning,
&result.trie_updates,
);
trie_metrics
.into_trie_for_reuse_duration_histogram
.record(start.elapsed().as_secs_f64());
guard.store(PreservedSparseTrie::anchored(trie, state_root));
guard.store(PreservedSparseTrie::anchored(trie, result.state_root));
deferred
} else {
debug!(
@@ -740,6 +721,7 @@ fn convert_serial<RawTx, Tx, TxEnv, InnerTx, Recovered, Err, C>(
let _ = prewarm_tx.send((idx, tx.clone()));
}
let _ = execute_tx.send(tx);
debug!(target: "engine::tree::payload_processor", idx, "yielded transaction");
}
}
@@ -1019,10 +1001,16 @@ impl PayloadExecutionCache {
/// Updates the cache with a closure that has exclusive access to the guard.
/// This ensures that all cache operations happen atomically.
///
/// Callers must not mutate the *underlying* [`ExecutionCache`] data (e.g. via
/// `SavedCache::clear`) while other tasks may hold clones of the same
/// `SavedCache`. Swapping the slot value (`*cached = Some(..)` / `*cached = None`)
/// is always safe because existing clones retain their own `Arc` references.
/// ## CRITICAL SAFETY REQUIREMENT
///
/// **Before calling this method, you MUST ensure there are no other active cache users.**
/// This includes:
/// - No running [`PrewarmCacheTask`] instances that could write to the cache
/// - No concurrent transactions that might access the cached state
/// - All prewarming operations must be completed or cancelled
///
/// Violating this requirement can result in cache corruption, incorrect state data,
/// and potential consensus failures.
pub fn update_with_guard<F>(&self, update_fn: F)
where
F: FnOnce(&mut Option<SavedCache>),
@@ -1068,11 +1056,13 @@ pub struct ExecutionEnv<Evm: ConfigureEvm> {
pub withdrawals: Option<Vec<Withdrawal>>,
}
impl<Evm: ConfigureEvm> Default for ExecutionEnv<Evm>
impl<Evm: ConfigureEvm> ExecutionEnv<Evm>
where
EvmEnvFor<Evm>: Default,
{
fn default() -> Self {
/// Creates a new [`ExecutionEnv`] with default values for testing.
#[cfg(any(test, feature = "test-utils"))]
pub fn test_default() -> Self {
Self {
evm_env: Default::default(),
hash: Default::default(),
@@ -1090,7 +1080,7 @@ mod tests {
use super::PayloadExecutionCache;
use crate::tree::{
cached_state::{CachedStateMetrics, ExecutionCache, SavedCache},
payload_processor::{evm_state_to_hashed_post_state, PayloadProcessor},
payload_processor::{evm_state_to_hashed_post_state, ExecutionEnv, PayloadProcessor},
precompile_cache::PrecompileCacheMap,
StateProviderBuilder, TreeConfig,
};
@@ -1365,7 +1355,7 @@ mod tests {
let provider_factory = BlockchainProvider::new(factory).unwrap();
let mut handle = payload_processor.spawn(
Default::default(),
ExecutionEnv::test_default(),
(
Vec::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>::new(),
std::convert::identity,

View File

@@ -197,7 +197,7 @@ pub(crate) struct MultiProofTaskMetrics {
pub(crate) fn dispatch_with_chunking<T, I>(
items: T,
chunking_len: usize,
chunk_size: Option<usize>,
chunk_size: usize,
max_targets_for_chunking: usize,
available_account_workers: usize,
available_storage_workers: usize,
@@ -211,10 +211,7 @@ where
available_account_workers > 1 ||
available_storage_workers > 1;
if should_chunk &&
let Some(chunk_size) = chunk_size &&
chunking_len > chunk_size
{
if should_chunk && chunking_len > chunk_size {
let mut num_chunks = 0usize;
for chunk in chunker(items, chunk_size) {
dispatch(chunk);

View File

@@ -20,9 +20,8 @@ use crate::tree::{
use alloy_consensus::transaction::TxHashRef;
use alloy_eip7928::BlockAccessList;
use alloy_eips::eip4895::Withdrawal;
use alloy_evm::Database;
use alloy_primitives::{keccak256, StorageKey, B256};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use crossbeam_channel::Sender as CrossbeamSender;
use metrics::{Counter, Gauge, Histogram};
use rayon::prelude::*;
use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, RecoveredTx, SpecFor};
@@ -33,11 +32,11 @@ use reth_provider::{
StateReader,
};
use reth_revm::{database::StateProviderDatabase, state::EvmState};
use reth_tasks::Runtime;
use reth_tasks::{pool::WorkerPool, Runtime};
use reth_trie_parallel::targets_v2::MultiProofTargetsV2;
use std::sync::{
atomic::{AtomicBool, Ordering},
mpsc::{self, channel, Receiver, Sender, SyncSender},
mpsc::{self, channel, Receiver, Sender},
Arc,
};
use tracing::{debug, debug_span, instrument, trace, warn, Span};
@@ -54,15 +53,6 @@ pub enum PrewarmMode<Tx> {
Skipped,
}
/// A wrapper for transactions that includes their index in the block.
#[derive(Clone)]
struct IndexedTransaction<Tx> {
/// The transaction index in the block.
index: usize,
/// The wrapped transaction.
tx: Tx,
}
/// A task that is responsible for caching and prewarming the cache by executing transactions
/// individually in parallel.
///
@@ -122,91 +112,150 @@ where
)
}
/// Spawns all pending transactions as blocking tasks by first chunking them.
/// Streams pending transactions and executes them in parallel on the prewarming pool.
///
/// For Optimism chains, special handling is applied to the first transaction if it's a
/// deposit transaction (type 0x7E/126) which sets critical metadata that affects all
/// subsequent transactions in the block.
fn spawn_all<Tx>(
/// Kicks off EVM init on every pool thread, then uses `in_place_scope` to dispatch
/// transactions as they arrive and wait for all spawned tasks to complete before
/// clearing per-thread state. Workers that start via work-stealing lazily initialise
/// their EVM state on first access via [`get_or_init`](reth_tasks::pool::Worker::get_or_init).
fn spawn_txs_prewarm<Tx>(
&self,
pending: mpsc::Receiver<(usize, Tx)>,
actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
) where
Tx: ExecutableTxFor<Evm> + Clone + Send + 'static,
Tx: ExecutableTxFor<Evm> + Send + 'static,
{
let executor = self.executor.clone();
let ctx = self.ctx.clone();
let span = Span::current();
self.executor.spawn_blocking_named("prewarm-spawn", move || {
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered();
self.executor.spawn_blocking_named("prewarm-txs", move || {
let _enter = debug_span!(
target: "engine::tree::payload_processor::prewarm",
parent: span,
"prewarm_txs"
)
.entered();
let pool_threads = executor.prewarming_pool().current_num_threads();
// Don't spawn more workers than transactions. When transaction_count is 0
// (unknown), use all pool threads.
let workers_needed = if ctx.env.transaction_count > 0 {
ctx.env.transaction_count.min(pool_threads)
} else {
pool_threads
};
let ctx = &ctx;
let pool = executor.prewarming_pool();
let (done_tx, done_rx) = mpsc::sync_channel(workers_needed);
// Spawn workers
let tx_sender = ctx.clone().spawn_workers(workers_needed, &executor, to_multi_proof.clone(), done_tx.clone());
// Distribute transactions to workers
let mut tx_count = 0usize;
while let Ok((tx_index, tx)) = pending.recv() {
// Stop distributing if termination was requested
if ctx.terminate_execution.load(Ordering::Relaxed) {
trace!(
target: "engine::tree::payload_processor::prewarm",
"Termination requested, stopping transaction distribution"
);
break;
let to_multi_proof = to_multi_proof.as_ref();
pool.in_place_scope(|s| {
s.spawn(|_| {
pool.init::<PrewarmEvmState<Evm>>(|_| ctx.evm_for_ctx());
});
while let Ok((index, tx)) = pending.recv() {
if ctx.terminate_execution.load(Ordering::Relaxed) {
trace!(
target: "engine::tree::payload_processor::prewarm",
"Termination requested, stopping transaction distribution"
);
break;
}
tx_count += 1;
let parent_span = Span::current();
s.spawn(move |_| {
let _enter = debug_span!(
target: "engine::tree::payload_processor::prewarm",
parent: parent_span,
"prewarm_tx",
i = index,
)
.entered();
Self::transact_worker(ctx, index, tx, to_multi_proof);
});
}
let indexed_tx = IndexedTransaction { index: tx_index, tx };
// Send withdrawal prefetch targets after all transactions dispatched
if let Some(to_multi_proof) = to_multi_proof &&
let Some(withdrawals) = &ctx.env.withdrawals &&
!withdrawals.is_empty()
{
let targets = multiproof_targets_from_withdrawals(withdrawals);
let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
}
});
// Send transaction to the workers
// Ignore send errors: workers listen to terminate_execution and may
// exit early when signaled.
let _ = tx_sender.send(indexed_tx);
tx_count += 1;
}
// Send withdrawal prefetch targets after all transactions have been distributed
if let Some(to_multi_proof) = to_multi_proof
&& let Some(withdrawals) = &ctx.env.withdrawals
&& !withdrawals.is_empty()
{
let targets = multiproof_targets_from_withdrawals(withdrawals);
let _ = to_multi_proof
.send(MultiProofMessage::PrefetchProofs(targets));
}
// drop sender and wait for all tasks to finish
drop(done_tx);
drop(tx_sender);
while done_rx.recv().is_ok() {}
// All tasks are done — clear per-thread EVM state for the next block.
pool.clear();
let _ = actions_tx
.send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_count });
});
}
/// Executes a single prewarm transaction on the current pool thread's EVM.
///
/// Lazily initialises per-thread [`PrewarmEvmState`] via
/// [`get_or_init`](reth_tasks::pool::Worker::get_or_init) on first access.
fn transact_worker<Tx>(
ctx: &PrewarmContext<N, P, Evm>,
index: usize,
tx: Tx,
to_multi_proof: Option<&CrossbeamSender<MultiProofMessage>>,
) where
Tx: ExecutableTxFor<Evm>,
{
WorkerPool::with_worker_mut(|worker| {
let Some((evm, metrics, terminate_execution)) =
worker.get_or_init::<PrewarmEvmState<Evm>>(|| ctx.evm_for_ctx()).as_mut()
else {
return;
};
if terminate_execution.load(Ordering::Relaxed) {
return;
}
let start = Instant::now();
let (tx_env, tx) = tx.into_parts();
let res = match evm.transact(tx_env) {
Ok(res) => res,
Err(err) => {
trace!(
target: "engine::tree::payload_processor::prewarm",
%err,
tx_hash=%tx.tx().tx_hash(),
sender=%tx.signer(),
"Error when executing prewarm transaction",
);
metrics.transaction_errors.increment(1);
return;
}
};
metrics.execution_duration.record(start.elapsed());
if terminate_execution.load(Ordering::Relaxed) {
return;
}
if index > 0 {
let (targets, storage_targets) = multiproof_targets_from_state(res.state);
metrics.prefetch_storage_targets.record(storage_targets as f64);
if let Some(to_multi_proof) = to_multi_proof {
let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
}
}
metrics.total_runtime.record(start.elapsed());
});
}
/// This method calls `ExecutionCache::update_with_guard` which requires exclusive access.
/// It should only be called after ensuring that:
/// 1. All prewarming tasks have completed execution
/// 2. No other concurrent operations are accessing the cache
///
/// Saves the warmed caches back into the shared slot after prewarming completes.
///
/// Waits for block validation without any lock held, then only on success inserts
/// state and publishes under a brief write lock. This avoids the ~100ms+ lock hold
/// that previously blocked concurrent readers during `valid_block_rx.recv()`.
///
/// The ordering is critical: `insert_state()` mutates the shared fixed-caches
/// in-place while the usage guard is still held (keeping `is_available() == false`),
/// then `split()` releases the guard and publishes the new cache atomically.
/// This consumes the `SavedCache` held by the task, which releases its usage guard and allows
/// the new, warmed cache to be inserted.
///
/// This method is called from `run()` only after all execution tasks are complete.
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
@@ -223,36 +272,36 @@ where
if let Some(saved_cache) = saved_cache {
debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
// Perform all cache operations atomically under the lock
execution_cache.update_with_guard(|cached| {
// consumes the `SavedCache` held by the prewarming task, which releases its usage
// guard
let (caches, cache_metrics, disable_cache_metrics) = saved_cache.split();
let new_cache = SavedCache::new(hash, caches, cache_metrics)
.with_disable_cache_metrics(disable_cache_metrics);
// Wait for state root validation WITHOUT holding the cache lock.
// This is the key optimization: the original code held the lock across this
// blocking recv(), which blocked the next block's prewarming from accessing
// the cache for ~100ms+.
if valid_block_rx.recv().is_err() {
debug!(target: "engine::caching", parent_hash=?hash, "skipped cache publish on invalid block");
return;
}
// Block is valid — mutate caches while the usage guard is still held
// (keeping is_available() == false) so no concurrent reader can observe
// the cache mid-mutation via get_cache_for().
if saved_cache.cache().insert_state(&execution_outcome.state).is_err() {
execution_cache.update_with_guard(|cached| {
// Insert state into cache while holding the lock
// Access the BundleState through the shared ExecutionOutcome
if new_cache.cache().insert_state(&execution_outcome.state).is_err() {
// Clear the cache on error to prevent having a polluted cache
*cached = None;
});
debug!(target: "engine::caching", "cleared execution cache on update error");
} else {
saved_cache.update_metrics();
debug!(target: "engine::caching", "cleared execution cache on update error");
return;
}
// Now consume the SavedCache (releasing the usage guard) and publish
// the new cache under a brief lock.
execution_cache.update_with_guard(|cached| {
let (caches, cache_metrics, disable_cache_metrics) = saved_cache.split();
let new_cache = SavedCache::new(hash, caches, cache_metrics)
.with_disable_cache_metrics(disable_cache_metrics);
new_cache.update_metrics();
if valid_block_rx.recv().is_ok() {
// Replace the shared cache with the new one; the previous cache (if any) is
// dropped.
*cached = Some(new_cache);
});
}
} else {
// Block was invalid; caches were already mutated by insert_state above,
// so we must clear to prevent using polluted state
*cached = None;
debug!(target: "engine::caching", "cleared execution cache on invalid block");
}
});
let elapsed = start.elapsed();
debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");
@@ -370,12 +419,12 @@ where
)]
pub fn run<Tx>(self, mode: PrewarmMode<Tx>, actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>)
where
Tx: ExecutableTxFor<Evm> + Clone + Send + 'static,
Tx: ExecutableTxFor<Evm> + Send + 'static,
{
// Spawn execution tasks based on mode
match mode {
PrewarmMode::Transactions(pending) => {
self.spawn_all(pending, actions_tx, self.to_multi_proof.clone());
self.spawn_txs_prewarm(pending, actions_tx, self.to_multi_proof.clone());
}
PrewarmMode::BlockAccessList(bal) => {
self.run_bal_prewarm(bal, actions_tx);
@@ -454,27 +503,24 @@ where
pub precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
}
/// Per-thread EVM state initialised by [`PrewarmContext::evm_for_ctx`] and stored in
/// [`WorkerPool`] workers via [`Worker::get_or_init`](reth_tasks::pool::Worker::get_or_init).
type PrewarmEvmState<Evm> = Option<(
EvmFor<Evm, StateProviderDatabase<reth_provider::StateProviderBox>>,
PrewarmMetrics,
Arc<AtomicBool>,
)>;
impl<N, P, Evm> PrewarmContext<N, P, Evm>
where
N: NodePrimitives,
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
Evm: ConfigureEvm<Primitives = N> + 'static,
{
/// Splits this context into an evm, metrics, and the atomic bool for terminating execution.
/// Creates a per-thread EVM, metrics handle, and termination flag for prewarming.
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn evm_for_ctx(self) -> Option<(EvmFor<Evm, impl Database>, PrewarmMetrics, Arc<AtomicBool>)> {
let Self {
env,
evm_config,
saved_cache,
provider,
metrics,
terminate_execution,
precompile_cache_disabled,
precompile_cache_map,
} = self;
let mut state_provider = match provider.build() {
fn evm_for_ctx(&self) -> PrewarmEvmState<Evm> {
let mut state_provider = match self.provider.build() {
Ok(provider) => provider,
Err(err) => {
trace!(
@@ -487,7 +533,7 @@ where
};
// Use the caches to create a new provider with caching
if let Some(saved_cache) = saved_cache {
if let Some(saved_cache) = &self.saved_cache {
let caches = saved_cache.cache().clone();
let cache_metrics = saved_cache.metrics().clone();
state_provider =
@@ -496,7 +542,7 @@ where
let state_provider = StateProviderDatabase::new(state_provider);
let mut evm_env = env.evm_env;
let mut evm_env = self.env.evm_env.clone();
// we must disable the nonce check so that we can execute the transaction even if the nonce
// doesn't match what's on chain.
@@ -508,130 +554,21 @@ where
// create a new executor and disable nonce checks in the env
let spec_id = *evm_env.spec_id();
let mut evm = evm_config.evm_with_env(state_provider, evm_env);
let mut evm = self.evm_config.evm_with_env(state_provider, evm_env);
if !precompile_cache_disabled {
if !self.precompile_cache_disabled {
// Only cache pure precompiles to avoid issues with stateful precompiles
evm.precompiles_mut().map_pure_precompiles(|address, precompile| {
CachedPrecompile::wrap(
precompile,
precompile_cache_map.cache_for_address(*address),
self.precompile_cache_map.cache_for_address(*address),
spec_id,
None, // No metrics for prewarm
)
});
}
Some((evm, metrics, terminate_execution))
}
/// Accepts a [`CrossbeamReceiver`] of transactions and a handle to prewarm task. Executes
/// transactions and streams [`MultiProofMessage::PrefetchProofs`] messages for each
/// transaction.
///
/// This function processes transactions sequentially from the receiver and emits outcome events
/// via the provided sender. Execution errors are logged and tracked but do not stop the batch
/// processing unless the task is explicitly cancelled.
///
/// Note: There are no ordering guarantees; this does not reflect the state produced by
/// sequential execution.
fn transact_batch<Tx>(
self,
txs: CrossbeamReceiver<IndexedTransaction<Tx>>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
done_tx: SyncSender<()>,
) where
Tx: ExecutableTxFor<Evm>,
{
let Some((mut evm, metrics, terminate_execution)) = self.evm_for_ctx() else { return };
while let Ok(IndexedTransaction { index, tx }) = txs.recv() {
let _enter = debug_span!(
target: "engine::tree::payload_processor::prewarm",
"prewarm tx",
i=index,
)
.entered();
// create the tx env
let start = Instant::now();
// If the task was cancelled, stop execution, and exit.
if terminate_execution.load(Ordering::Relaxed) {
break
}
let (tx_env, tx) = tx.into_parts();
let res = match evm.transact(tx_env) {
Ok(res) => res,
Err(err) => {
trace!(
target: "engine::tree::payload_processor::prewarm",
%err,
tx_hash=%tx.tx().tx_hash(),
sender=%tx.signer(),
"Error when executing prewarm transaction",
);
// Track transaction execution errors
metrics.transaction_errors.increment(1);
// skip error because we can ignore these errors and continue with the next tx
continue
}
};
metrics.execution_duration.record(start.elapsed());
// If the task was cancelled, stop execution, and exit.
if terminate_execution.load(Ordering::Relaxed) {
break
}
// Only send outcome for transactions after the first txn
// as the main execution will be just as fast
if index > 0 {
let (targets, storage_targets) = multiproof_targets_from_state(res.state);
metrics.prefetch_storage_targets.record(storage_targets as f64);
if let Some(to_multi_proof) = &to_multi_proof {
let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(targets));
}
}
metrics.total_runtime.record(start.elapsed());
}
// send a message to the main task to flag that we're done
let _ = done_tx.send(());
}
/// Spawns worker tasks that pull transactions from a shared channel.
///
/// Returns the sender for distributing transactions to workers.
fn spawn_workers<Tx>(
self,
workers_needed: usize,
task_executor: &Runtime,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
done_tx: SyncSender<()>,
) -> CrossbeamSender<IndexedTransaction<Tx>>
where
Tx: ExecutableTxFor<Evm> + Send + 'static,
{
let (tx_sender, tx_receiver) = crossbeam_channel::unbounded();
// Spawn workers that all pull from the shared receiver
let span = Span::current();
for idx in 0..workers_needed {
let ctx = self.clone();
let to_multi_proof = to_multi_proof.clone();
let done_tx = done_tx.clone();
let rx = tx_receiver.clone();
let span = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: &span, "prewarm_worker", idx);
task_executor.prewarming_pool().spawn(move || {
let _enter = span.entered();
ctx.transact_batch(rx, to_multi_proof, done_tx);
});
}
tx_sender
Some((evm, self.metrics.clone(), self.terminate_execution.clone()))
}
/// Prefetches a single account and all its storage slots from the BAL into the cache.

View File

@@ -12,6 +12,7 @@ use crossbeam_channel::Receiver;
use reth_primitives_traits::Receipt;
use reth_trie_common::ordered_root::OrderedTrieRootEncodedBuilder;
use tokio::sync::oneshot;
use tracing::debug_span;
/// Receipt with index, ready to be sent to the background task for encoding and trie building.
#[derive(Debug, Clone)]
@@ -65,6 +66,13 @@ impl<R: Receipt> ReceiptRootTaskHandle<R> {
/// * `receipts_len` - The total number of receipts expected. This is needed to correctly order
/// the trie keys according to RLP encoding rules.
pub fn run(self, receipts_len: usize) {
let _span = debug_span!(
target: "engine::tree::payload_processor",
"receipt_root",
receipts_len,
)
.entered();
let mut builder = OrderedTrieRootEncodedBuilder::new(receipts_len);
let mut aggregated_bloom = Bloom::ZERO;
let mut encode_buf = Vec::new();

View File

@@ -1,5 +1,7 @@
//! Sparse Trie task related functionality.
use std::sync::Arc;
use crate::tree::{
multiproof::{
dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage,
@@ -31,7 +33,7 @@ use reth_trie_sparse::{
SparseTrie,
};
use revm_primitives::{hash_map::Entry, B256Map};
use tracing::{debug, debug_span, error, instrument};
use tracing::{debug, debug_span, error, instrument, trace_span};
/// Maximum number of pending/prewarm updates that we accumulate in memory before actually applying.
const MAX_PENDING_UPDATES: usize = 100;
@@ -51,7 +53,7 @@ pub(super) struct SparseTrieCacheTask<A = ParallelSparseTrie, S = ParallelSparse
/// The size of proof targets chunk to spawn in one calculation.
/// If None, chunking is disabled and all targets are processed in a single proof.
chunk_size: Option<usize>,
chunk_size: usize,
/// If this number is exceeded and chunking is enabled, then this will override whether or not
/// there are any active workers and force chunking across workers. This is to prevent tasks
/// which are very long from hitting a single worker.
@@ -112,7 +114,7 @@ where
proof_worker_handle: ProofWorkerHandle,
metrics: MultiProofTaskMetrics,
trie: SparseStateTrie<A, S>,
chunk_size: Option<usize>,
chunk_size: usize,
) -> Self {
let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
@@ -192,8 +194,10 @@ where
max_nodes_capacity: usize,
max_values_capacity: usize,
disable_pruning: bool,
updates: &TrieUpdates,
) -> (SparseStateTrie<A, S>, DeferredDrops) {
let Self { mut trie, .. } = self;
trie.commit_updates(updates);
if !disable_pruning {
trie.prune(prune_depth, max_storage_tries);
trie.shrink_to(max_nodes_capacity, max_values_capacity);
@@ -307,7 +311,7 @@ where
self.process_new_updates()?;
self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
self.dispatch_pending_targets();
} else if self.pending_targets.len() > self.chunk_size.unwrap_or_default() {
} else if self.pending_targets.len() > self.chunk_size {
// Make sure to dispatch targets if we've accumulated a lot of them.
self.dispatch_pending_targets();
}
@@ -330,7 +334,7 @@ where
Ok(StateRootComputeOutcome {
state_root,
trie_updates,
trie_updates: Arc::new(trie_updates),
#[cfg(feature = "trie-debug")]
debug_recorders,
})
@@ -428,12 +432,12 @@ where
})
}
#[instrument(
level = "debug",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
fn process_new_updates(&mut self) -> SparseTrieResult<()> {
if self.pending_updates == 0 {
return Ok(());
}
let _span = debug_span!("process_new_updates").entered();
self.pending_updates = 0;
// Firstly apply all new storage and account updates to the tries.
@@ -489,50 +493,38 @@ where
let storage_updates =
if new { &mut self.new_storage_updates } else { &mut self.storage_updates };
// Process all storage updates in parallel, skipping tries with no pending updates.
let span = tracing::Span::current();
let storage_results = storage_updates
.iter_mut()
.filter(|(_, updates)| !updates.is_empty())
.map(|(address, updates)| {
let trie = self.trie.take_or_create_storage_trie(address);
let fetched = self.fetched_storage_targets.remove(address).unwrap_or_default();
// Process all storage updates, skipping tries with no pending updates.
let span = debug_span!("process_storage_leaf_updates").entered();
for (address, updates) in storage_updates {
if updates.is_empty() {
continue;
}
let _enter = trace_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_trie_leaf_updates", a=%address).entered();
(address, updates, fetched, trie)
})
.par_bridge_buffered()
.map(|(address, updates, mut fetched, mut trie)| {
let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_trie_leaf_updates", a=%address).entered();
let mut targets = Vec::new();
let trie = self.trie.get_or_create_storage_trie_mut(*address);
let fetched = self.fetched_storage_targets.entry(*address).or_default();
let mut targets = Vec::new();
trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
Entry::Occupied(mut entry) => {
if min_len < *entry.get() {
entry.insert(min_len);
targets.push(Target::new(path).with_min_len(min_len));
}
}
Entry::Vacant(entry) => {
trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
Entry::Occupied(mut entry) => {
if min_len < *entry.get() {
entry.insert(min_len);
targets.push(Target::new(path).with_min_len(min_len));
}
})?;
SparseTrieResult::Ok((address, targets, fetched, trie))
})
.collect::<Result<Vec<_>, _>>()?;
drop(span);
for (address, targets, fetched, trie) in storage_results {
self.fetched_storage_targets.insert(*address, fetched);
self.trie.insert_storage_trie(*address, trie);
}
Entry::Vacant(entry) => {
entry.insert(min_len);
targets.push(Target::new(path).with_min_len(min_len));
}
})?;
if !targets.is_empty() {
self.pending_targets.extend_storage_targets(address, targets);
}
}
drop(span);
// Process account trie updates and fill the account targets.
self.process_account_leaf_updates(new)?;
@@ -668,39 +660,36 @@ where
Ok(())
}
#[instrument(
level = "debug",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
fn dispatch_pending_targets(&mut self) {
if !self.pending_targets.is_empty() {
let (targets, chunking_length) = self.pending_targets.take();
dispatch_with_chunking(
targets,
chunking_length,
self.chunk_size,
self.max_targets_for_chunking,
self.proof_worker_handle.available_account_workers(),
self.proof_worker_handle.available_storage_workers(),
MultiProofTargetsV2::chunks,
|proof_targets| {
if let Err(e) = self.proof_worker_handle.dispatch_account_multiproof(
AccountMultiproofInput {
targets: proof_targets,
proof_result_sender: ProofResultContext::new(
self.proof_result_tx.clone(),
0,
HashedPostState::default(),
Instant::now(),
),
},
) {
error!("failed to dispatch account multiproof: {e:?}");
}
},
);
if self.pending_targets.is_empty() {
return;
}
let _span = debug_span!("dispatch_pending_targets").entered();
let (targets, chunking_length) = self.pending_targets.take();
dispatch_with_chunking(
targets,
chunking_length,
self.chunk_size,
self.max_targets_for_chunking,
self.proof_worker_handle.available_account_workers(),
self.proof_worker_handle.available_storage_workers(),
MultiProofTargetsV2::chunks,
|proof_targets| {
if let Err(e) =
self.proof_worker_handle.dispatch_account_multiproof(AccountMultiproofInput {
targets: proof_targets,
proof_result_sender: ProofResultContext::new(
self.proof_result_tx.clone(),
HashedPostState::default(),
Instant::now(),
),
})
{
error!("failed to dispatch account multiproof: {e:?}");
}
},
);
}
}
@@ -769,12 +758,12 @@ enum SparseTrieTaskMessage {
/// Outcome of the state root computation, including the state root itself with
/// the trie updates.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct StateRootComputeOutcome {
/// The state root.
pub state_root: B256,
/// The trie updates.
pub trie_updates: TrieUpdates,
pub trie_updates: Arc<TrieUpdates>,
/// Debug recorders taken from the sparse tries, keyed by `None` for account trie
/// and `Some(address)` for storage tries.
#[cfg(feature = "trie-debug")]

View File

@@ -52,7 +52,7 @@ use std::{
panic::{self, AssertUnwindSafe},
sync::{mpsc::RecvTimeoutError, Arc},
};
use tracing::{debug, debug_span, error, info, instrument, trace, warn};
use tracing::{debug, debug_span, error, info, instrument, trace, warn, Span};
/// Handle to a [`HashedPostState`] computed on a background thread.
type LazyHashedPostState = reth_tasks::LazyHandle<HashedPostState>;
@@ -292,7 +292,7 @@ where
let block = self.convert_to_block(input)?;
// Validate block consensus rules which includes header validation
if let Err(consensus_err) = self.validate_block_inner(&block) {
if let Err(consensus_err) = self.validate_block_inner(&block, None) {
// Header validation error takes precedence over execution error
return Err(InsertBlockError::new(block, consensus_err.into()).into())
}
@@ -334,9 +334,10 @@ where
V: PayloadValidator<T, Block = N::Block> + Clone,
Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
{
// Spawn block conversion on a background thread so it runs concurrently with the
// Spawn payload conversion on a background thread so it runs concurrently with the
// rest of the function (setup + execution). For payloads this overlaps the cost of
// RLP decoding + header hashing; for already-converted blocks this is a no-op.
// RLP decoding + header hashing.
let is_payload = matches!(&input, BlockOrPayload::Payload(_));
let convert_to_block = match &input {
BlockOrPayload::Payload(_) => {
let payload_clone = input.clone();
@@ -527,18 +528,44 @@ where
hashed_state_provider.hashed_post_state(&hashed_state_output.state)
});
let block = convert_to_block(input)?.with_senders(senders);
let block = convert_to_block(input)?;
let transaction_root = is_payload.then(|| {
let block = block.clone();
let parent_span = Span::current();
let num_hash = block.num_hash();
self.payload_processor.executor().spawn_blocking_named("payload-tx-root", move || {
let _span =
debug_span!(target: "engine::tree::payload_validator", parent: parent_span, "payload_tx_root", block = ?num_hash)
.entered();
block.body().calculate_tx_root()
})
});
let block = block.with_senders(senders);
// Wait for the receipt root computation to complete.
let receipt_root_bloom = receipt_root_rx
.blocking_recv()
.inspect_err(|_| {
tracing::error!(
target: "engine::tree::payload_validator",
"Receipt root task dropped sender without result, receipt root calculation likely aborted"
);
})
.ok();
let receipt_root_bloom = {
let _enter = debug_span!(
target: "engine::tree::payload_validator",
"wait_receipt_root",
)
.entered();
receipt_root_rx
.blocking_recv()
.inspect_err(|_| {
tracing::error!(
target: "engine::tree::payload_validator",
"Receipt root task dropped sender without result, receipt root calculation likely aborted"
);
})
.ok()
};
let transaction_root = transaction_root.map(|handle| {
let _span =
debug_span!(target: "engine::tree::payload_validator", "wait_payload_tx_root")
.entered();
handle.try_into_inner().expect("sole handle")
});
let hashed_state = ensure_ok_post_block!(
self.validate_post_execution(
@@ -546,6 +573,7 @@ where
&parent_block,
&output,
&mut ctx,
transaction_root,
receipt_root_bloom,
hashed_state,
),
@@ -591,7 +619,7 @@ where
let _has_diff = self.compare_trie_updates_with_serial(
overlay_factory.clone(),
&hashed_state,
trie_updates.clone(),
trie_updates.as_ref().clone(),
);
#[cfg(feature = "trie-debug")]
if _has_diff {
@@ -637,7 +665,7 @@ where
?elapsed,
"Regular root task finished"
);
maybe_state_root = Some((result.0, result.1, elapsed));
maybe_state_root = Some((result.0, Arc::new(result.1), elapsed));
}
Err(error) => {
debug!(target: "engine::tree::payload_validator", %error, "Parallel state root computation failed");
@@ -672,7 +700,7 @@ where
self.metrics.block_validation.state_root_task_fallback_success_total.increment(1);
}
(root, updates, root_time.elapsed())
(root, Arc::new(updates), root_time.elapsed())
};
self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
@@ -737,13 +765,19 @@ where
/// Validate if block is correct and satisfies all the consensus rules that concern the header
/// and block body itself.
#[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
fn validate_block_inner(&self, block: &SealedBlock<N::Block>) -> Result<(), ConsensusError> {
fn validate_block_inner(
&self,
block: &SealedBlock<N::Block>,
transaction_root: Option<B256>,
) -> Result<(), ConsensusError> {
if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
error!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {}: {e}", block.hash());
return Err(e)
}
if let Err(e) = self.consensus.validate_block_pre_execution(block) {
if let Err(e) =
self.consensus.validate_block_pre_execution_with_tx_root(block, transaction_root)
{
error!(target: "engine::tree::payload_validator", ?block, "Failed to validate block {}: {e}", block.hash());
return Err(e)
}
@@ -1070,7 +1104,7 @@ where
let (state_root, trie_updates) = result?;
return Ok(Ok(StateRootComputeOutcome {
state_root,
trie_updates,
trie_updates: Arc::new(trie_updates),
#[cfg(feature = "trie-debug")]
debug_recorders: Vec::new(),
}));
@@ -1087,7 +1121,7 @@ where
let (state_root, trie_updates) = result?;
return Ok(Ok(StateRootComputeOutcome {
state_root,
trie_updates,
trie_updates: Arc::new(trie_updates),
#[cfg(feature = "trie-debug")]
debug_recorders: Vec::new(),
}));
@@ -1208,12 +1242,14 @@ where
///
/// The `hashed_state` handle wraps the background hashed post state computation.
#[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
#[expect(clippy::too_many_arguments)]
fn validate_post_execution<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
&self,
block: &RecoveredBlock<N::Block>,
parent_block: &SealedHeader<N::BlockHeader>,
output: &BlockExecutionOutput<N::Receipt>,
ctx: &mut TreeCtx<'_, N>,
transaction_root: Option<B256>,
receipt_root_bloom: Option<ReceiptRootBloom>,
hashed_state: LazyHashedPostState,
) -> Result<LazyHashedPostState, InsertBlockErrorKind>
@@ -1224,7 +1260,7 @@ where
trace!(target: "engine::tree::payload_validator", block=?block.num_hash(), "Validating block consensus");
// validate block consensus rules
if let Err(e) = self.validate_block_inner(block) {
if let Err(e) = self.validate_block_inner(block, transaction_root) {
return Err(e.into())
}
@@ -1251,10 +1287,15 @@ where
}
drop(_enter);
// Wait for the background keccak256 hashing task to complete. This blocks until
// all changed addresses and storage slots have been hashed.
let hashed_state_ref =
debug_span!(target: "engine::tree::payload_validator", "wait_hashed_post_state")
.in_scope(|| hashed_state.get());
let _enter = debug_span!(target: "engine::tree::payload_validator", "validate_block_post_execution_with_hashed_state").entered();
if let Err(err) = self
.validator
.validate_block_post_execution_with_hashed_state(hashed_state.get(), block)
if let Err(err) =
self.validator.validate_block_post_execution_with_hashed_state(hashed_state_ref, block)
{
// call post-block hook
self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
@@ -1479,7 +1520,7 @@ where
execution_outcome: Arc<BlockExecutionOutput<N::Receipt>>,
ctx: &TreeCtx<'_, N>,
hashed_state: LazyHashedPostState,
trie_output: TrieUpdates,
trie_output: Arc<TrieUpdates>,
overlay_factory: OverlayStateProviderFactory<P>,
) -> ExecutedBlock<N> {
// Capture parent hash and ancestor overlays for deferred trie input construction.
@@ -1502,7 +1543,7 @@ where
Err(handle) => Arc::new(handle.get().clone()),
};
let deferred_trie_data =
DeferredTrieData::pending(hashed_state, Arc::new(trie_output), anchor_hash, ancestors);
DeferredTrieData::pending(hashed_state, trie_output, anchor_hash, ancestors);
let deferred_handle_task = deferred_trie_data.clone();
let block_validation_metrics = self.metrics.block_validation.clone();

View File

@@ -97,7 +97,21 @@ where
{
let runner = match self.runner.take() {
Some(runner) => runner,
None => CliRunner::try_default_runtime()?,
None => {
let runtime_config = match &self.cli.command {
Commands::Node(command) => {
reth_tasks::RuntimeConfig::default().with_rayon(RayonConfig {
reserved_cpu_cores: command.engine.reserved_cpu_cores,
proof_storage_worker_threads: command.engine.storage_worker_count,
proof_account_worker_threads: command.engine.account_worker_count,
prewarming_threads: command.engine.prewarming_threads,
..Default::default()
})
}
_ => reth_tasks::RuntimeConfig::default(),
};
CliRunner::try_with_runtime_config(runtime_config)?
}
};
// Add network name if available to the logs dir
@@ -123,8 +137,8 @@ where
///
/// See [`Cli::init_tracing`] for more information.
pub fn init_tracing(&mut self, runner: &CliRunner) -> Result<()> {
if self.guard.is_none() {
self.guard = self.cli.init_tracing(runner, self.layers.take().unwrap_or_default())?;
if let Some(layers) = self.layers.take() {
self.guard = self.cli.init_tracing(runner, layers)?;
}
Ok(())
@@ -161,17 +175,6 @@ where
Rpc::validate_selection(ws_api, "ws.api").map_err(|e| eyre!("{e}"))?;
}
let rayon_config = RayonConfig {
reserved_cpu_cores: command.engine.reserved_cpu_cores,
proof_storage_worker_threads: command.engine.storage_worker_count,
proof_account_worker_threads: command.engine.account_worker_count,
prewarming_threads: command.engine.prewarming_threads,
..Default::default()
};
let runner = CliRunner::try_with_runtime_config(
reth_tasks::RuntimeConfig::default().with_rayon(rayon_config),
)?;
runner.run_command_until_exit(|ctx| {
command.execute(ctx, FnLauncher::new::<C, Ext>(launcher))
})

View File

@@ -1,11 +1,8 @@
//! CLI definition and entrypoint to executable
use crate::{
app::{run_commands_with, CliApp},
chainspec::EthereumChainSpecParser,
};
use crate::{app::CliApp, chainspec::EthereumChainSpecParser};
use clap::{Parser, Subcommand};
use reth_chainspec::{ChainSpec, EthChainSpec, Hardforks};
use reth_chainspec::{ChainSpec, Hardforks};
use reth_cli::chainspec::ChainSpecParser;
use reth_cli_commands::{
common::{CliComponentsBuilder, CliNodeTypes, HeaderMut},
@@ -22,7 +19,6 @@ use reth_node_core::{
args::{LogArgs, OtlpInitStatus, OtlpLogsStatus, TraceArgs},
version::version_metadata,
};
use reth_node_metrics::recorder::install_prometheus_recorder;
use reth_rpc_server_types::{DefaultRpcModuleValidator, RpcModuleValidator};
use reth_tracing::{FileWorkerGuard, Layers};
use std::{ffi::OsString, fmt, future::Future, marker::PhantomData, sync::Arc};
@@ -135,7 +131,8 @@ impl<
Fut: Future<Output = eyre::Result<()>>,
C: ChainSpecParser<ChainSpec = ChainSpec>,
{
self.with_runner(CliRunner::try_default_runtime()?, launcher)
self.configure()
.run(FnLauncher::new::<C, Ext>(async move |builder, ext| launcher(builder, ext).await))
}
/// Execute the configured cli command with the provided [`CliComponentsBuilder`].
@@ -156,7 +153,7 @@ impl<
N: CliNodeTypes<Primitives: NodePrimitives<BlockHeader: HeaderMut>, ChainSpec: Hardforks>,
C: ChainSpecParser<ChainSpec = N::ChainSpec>,
{
self.with_runner_and_components(CliRunner::try_default_runtime()?, components, launcher)
self.configure().run_with_components(components, launcher)
}
/// Execute the configured cli command with the provided [`CliRunner`].
@@ -192,7 +189,7 @@ impl<
/// Execute the configured cli command with the provided [`CliRunner`] and
/// [`CliComponentsBuilder`].
pub fn with_runner_and_components<N>(
mut self,
self,
runner: CliRunner,
components: impl CliComponentsBuilder<N>,
launcher: impl AsyncFnOnce(
@@ -204,24 +201,9 @@ impl<
N: CliNodeTypes<Primitives: NodePrimitives<BlockHeader: HeaderMut>, ChainSpec: Hardforks>,
C: ChainSpecParser<ChainSpec = N::ChainSpec>,
{
// Add network name if available to the logs dir
if let Some(chain_spec) = self.command.chain_spec() {
self.logs.log_file_directory =
self.logs.log_file_directory.join(chain_spec.chain().to_string());
}
// Apply node-specific log defaults before initializing tracing
if matches!(self.command, Commands::Node(_)) {
self.logs.apply_node_defaults();
}
let _guard = self.init_tracing(&runner, Layers::new())?;
// Install the prometheus recorder to be sure to record all metrics
install_prometheus_recorder();
// Use the shared standalone function to avoid duplication
run_commands_with::<C, Ext, Rpc, N, SubCmd>(self, runner, components, launcher)
let mut app = self.configure();
app.set_runner(runner);
app.run_with_components(components, launcher)
}
/// Initializes tracing with the configured options.
@@ -368,7 +350,7 @@ mod tests {
use super::*;
use crate::chainspec::SUPPORTED_CHAINS;
use clap::CommandFactory;
use reth_chainspec::SEPOLIA;
use reth_chainspec::{EthChainSpec, SEPOLIA};
use reth_node_core::args::ColorMode;
#[test]

View File

@@ -15,13 +15,16 @@ use alloc::{fmt::Debug, sync::Arc};
use alloy_consensus::{constants::MAXIMUM_EXTRA_DATA_SIZE, EMPTY_OMMER_ROOT_HASH};
use alloy_eips::eip7840::BlobParams;
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_consensus::{Consensus, ConsensusError, FullConsensus, HeaderValidator, ReceiptRootBloom};
use reth_consensus::{
Consensus, ConsensusError, FullConsensus, HeaderValidator, ReceiptRootBloom, TransactionRoot,
};
use reth_consensus_common::validation::{
validate_4844_header_standalone, validate_against_parent_4844,
validate_against_parent_eip1559_base_fee, validate_against_parent_gas_limit,
validate_against_parent_hash_number, validate_against_parent_timestamp,
validate_block_pre_execution, validate_body_against_header, validate_header_base_fee,
validate_header_extra_data, validate_header_gas,
validate_block_pre_execution, validate_block_pre_execution_with_tx_root,
validate_body_against_header, validate_header_base_fee, validate_header_extra_data,
validate_header_gas,
};
use reth_execution_types::BlockExecutionResult;
use reth_primitives_traits::{
@@ -102,6 +105,14 @@ where
fn validate_block_pre_execution(&self, block: &SealedBlock<B>) -> Result<(), ConsensusError> {
validate_block_pre_execution(block, &self.chain_spec)
}
fn validate_block_pre_execution_with_tx_root(
&self,
block: &SealedBlock<B>,
transaction_root: Option<TransactionRoot>,
) -> Result<(), ConsensusError> {
validate_block_pre_execution_with_tx_root(block, &self.chain_spec, transaction_root)
}
}
impl<H, ChainSpec> HeaderValidator<H> for EthBeaconConsensus<ChainSpec>

View File

@@ -253,8 +253,8 @@ where
// There's only limited amount of blob space available per block, so we need to check if
// the EIP-4844 can still fit in the block
let mut blob_tx_sidecar = None;
if let Some(blob_tx) = tx.as_eip4844() {
let tx_blob_count = blob_tx.tx().blob_versioned_hashes.len() as u64;
if let Some(blob_hashes) = tx.blob_versioned_hashes() {
let tx_blob_count = blob_hashes.len() as u64;
if block_blob_count + tx_blob_count > max_blob_count {
// we can't fit this _blob_ transaction into the block, so we mark it as
@@ -329,8 +329,8 @@ where
};
// add to the total blob gas used if the transaction successfully executed
if let Some(blob_tx) = tx.as_eip4844() {
block_blob_count += blob_tx.tx().blob_versioned_hashes.len() as u64;
if let Some(blob_hashes) = tx.blob_versioned_hashes() {
block_blob_count += blob_hashes.len() as u64;
// if we've reached the max blob count, we can skip blob txs entirely
if block_blob_count == max_blob_count {

View File

@@ -165,13 +165,8 @@ pub enum SparseTrieErrorKind {
#[error("sparse trie is blind")]
Blind,
/// Encountered blinded node on update.
#[error("attempted to update blind node at {path:?}: {hash}")]
BlindedNode {
/// Blind node path.
path: Nibbles,
/// Node hash
hash: B256,
},
#[error("attempted to update blind node at {0:?}")]
BlindedNode(Nibbles),
/// Encountered unexpected node at path when revealing.
#[error("encountered an invalid node at path {path:?} when revealing: {node:?}")]
Reveal {

View File

@@ -30,6 +30,6 @@ pub use peers::{
DEFAULT_REPUTATION,
},
state::PeerConnectionState,
ConnectionsConfig, Peer, PeersConfig,
ConnectionsConfig, Peer, PeersConfig, PersistedPeerInfo,
};
pub use session::{SessionLimits, SessionsConfig};

View File

@@ -11,7 +11,7 @@ use reth_net_banlist::{BanList, IpFilter};
use reth_network_peers::{NodeRecord, TrustedPeer};
use tracing::info;
use crate::{BackoffKind, ReputationChangeWeights};
use crate::{peers::PersistedPeerInfo, BackoffKind, ReputationChangeWeights};
/// Maximum number of available slots for outbound sessions.
pub const DEFAULT_MAX_COUNT_PEERS_OUTBOUND: u32 = 100;
@@ -147,6 +147,9 @@ pub struct PeersConfig {
/// Basic nodes to connect to.
#[cfg_attr(feature = "serde", serde(skip))]
pub basic_nodes: HashSet<NodeRecord>,
/// Peers restored from a previous run, containing richer metadata than basic nodes.
#[cfg_attr(feature = "serde", serde(skip))]
pub persisted_peers: Vec<PersistedPeerInfo>,
/// How long to ban bad peers.
#[cfg_attr(feature = "serde", serde(with = "humantime_serde"))]
pub ban_duration: Duration,
@@ -193,6 +196,7 @@ impl Default for PeersConfig {
trusted_nodes_only: false,
trusted_nodes_resolution_interval: Duration::from_secs(60 * 60),
basic_nodes: Default::default(),
persisted_peers: Default::default(),
max_backoff_count: 5,
incoming_ip_throttle_duration: INBOUND_IP_THROTTLE_DURATION,
ip_filter: IpFilter::default(),
@@ -298,20 +302,39 @@ impl PeersConfig {
self.connection_info.max_outbound + self.connection_info.max_inbound
}
/// Read from file nodes available at launch. Ignored if None.
/// Read persisted peers from file at launch.
///
/// Supports both the current [`PersistedPeerInfo`] format and the legacy `Vec<NodeRecord>`
/// format. Legacy entries are converted to [`PersistedPeerInfo`] with default metadata.
///
/// Ignored if `optional_file` is `None` or the file does not exist.
#[cfg(feature = "serde")]
pub fn with_basic_nodes_from_file(
self,
mut self,
optional_file: Option<impl AsRef<Path>>,
) -> Result<Self, io::Error> {
let Some(file_path) = optional_file else { return Ok(self) };
let reader = match std::fs::File::open(file_path.as_ref()) {
Ok(file) => io::BufReader::new(file),
let raw = match std::fs::read_to_string(file_path.as_ref()) {
Ok(contents) => contents,
Err(e) if e.kind() == ErrorKind::NotFound => return Ok(self),
Err(e) => Err(e)?,
Err(e) => return Err(e),
};
info!(target: "net::peers", file = %file_path.as_ref().display(), "Loading saved peers");
let nodes: HashSet<NodeRecord> = serde_json::from_reader(reader)?;
Ok(self.with_basic_nodes(nodes))
// Try the new format first, fall back to legacy Vec<NodeRecord>
let peers: Vec<PersistedPeerInfo> = serde_json::from_str(&raw)
.or_else(|_| {
let nodes: HashSet<NodeRecord> = serde_json::from_str(&raw)?;
Ok::<_, serde_json::Error>(
nodes.into_iter().map(PersistedPeerInfo::from_node_record).collect(),
)
})
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
info!(target: "net::peers", count = peers.len(), "Loaded persisted peers");
self.persisted_peers = peers;
Ok(self)
}
/// Configure the IP filter for restricting network connections to specific IP ranges.

View File

@@ -2,6 +2,8 @@
/// Represents the kind of peer
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(rename_all = "lowercase"))]
pub enum PeerKind {
/// Basic peer kind.
#[default]

View File

@@ -8,6 +8,7 @@ pub use config::{ConnectionsConfig, PeersConfig};
pub use reputation::{Reputation, ReputationChange, ReputationChangeKind, ReputationChangeWeights};
use alloy_eip2124::ForkId;
use reth_network_peers::{NodeRecord, PeerId};
use tracing::trace;
use crate::{
@@ -140,3 +141,33 @@ impl Peer {
matches!(self.kind, PeerKind::Static)
}
}
/// Peer info persisted to disk.
///
/// Contains richer metadata than a plain [`NodeRecord`], preserving the peer's kind, fork ID,
/// and reputation across restarts.
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct PersistedPeerInfo {
/// The node record (id, address, ports).
pub record: NodeRecord,
/// The kind of peer.
pub kind: PeerKind,
/// The [`ForkId`] that the peer announced via discovery.
#[cfg_attr(feature = "serde", serde(default, skip_serializing_if = "Option::is_none"))]
pub fork_id: Option<ForkId>,
/// The peer's reputation at the time of persisting.
pub reputation: i32,
}
impl PersistedPeerInfo {
/// Returns the peer id.
pub const fn peer_id(&self) -> PeerId {
self.record.id
}
/// Converts a legacy [`NodeRecord`] into a [`PersistedPeerInfo`] with default metadata.
pub const fn from_node_record(record: NodeRecord) -> Self {
Self { record, kind: PeerKind::Basic, fork_id: None, reputation: DEFAULT_REPUTATION }
}
}

View File

@@ -33,7 +33,7 @@ reth-storage-api.workspace = true
reth-tokio-util.workspace = true
reth-consensus.workspace = true
reth-network-peers = { workspace = true, features = ["net"] }
reth-network-types.workspace = true
reth-network-types = { workspace = true, features = ["serde"] }
# ethereum
alloy-consensus.workspace = true
@@ -105,7 +105,6 @@ serde = [
"dep:serde",
"secp256k1/serde",
"enr/serde",
"reth-network-types/serde",
"reth-dns-discovery/serde",
"reth-eth-wire/serde",
"reth-eth-wire-types/serde",
@@ -125,6 +124,7 @@ serde = [
"reth-network-api/serde",
"rand_08/serde",
"reth-storage-api/serde",
"reth-network-types/serde",
]
test-utils = [
"reth-transaction-pool/test-utils",

View File

@@ -448,10 +448,13 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
/// Collect the peers from the [`NetworkManager`] and write them to the given
/// `persistent_peers_file`.
///
/// Only persists peers that are not currently backed off or banned. Includes metadata like
/// peer kind, fork ID, and reputation.
pub fn write_peers_to_file(&self, persistent_peers_file: &Path) -> Result<(), FsPathError> {
let known_peers = self.all_peers().collect::<Vec<_>>();
let peers = self.swarm.state().peers().persistable_peers().collect::<Vec<_>>();
persistent_peers_file.parent().map(fs::create_dir_all).transpose()?;
reth_fs_util::write_json_file(persistent_peers_file, &known_peers)?;
reth_fs_util::write_json_file(persistent_peers_file, &peers)?;
Ok(())
}

View File

@@ -20,7 +20,7 @@ use reth_network_types::{
reputation::{DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE},
},
ConnectionsConfig, Peer, PeerAddr, PeerConnectionState, PeerKind, PeersConfig,
ReputationChangeKind, ReputationChangeOutcome, ReputationChangeWeights,
PersistedPeerInfo, ReputationChangeKind, ReputationChangeOutcome, ReputationChangeWeights,
};
use std::{
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
@@ -111,6 +111,7 @@ impl PeersManager {
trusted_nodes_only,
trusted_nodes_resolution_interval,
basic_nodes,
persisted_peers,
max_backoff_count,
incoming_ip_throttle_duration,
ip_filter,
@@ -122,7 +123,8 @@ impl PeersManager {
// We use half of the interval to decrease the max duration to `150%` in worst case
let unban_interval = ban_duration.min(backoff_durations.low) / 2;
let mut peers = HashMap::with_capacity(trusted_nodes.len() + basic_nodes.len());
let mut peers =
HashMap::with_capacity(trusted_nodes.len() + basic_nodes.len() + persisted_peers.len());
let mut trusted_peer_ids = HashSet::with_capacity(trusted_nodes.len());
for trusted_peer in &trusted_nodes {
@@ -139,6 +141,19 @@ impl PeersManager {
}
}
for PersistedPeerInfo { record, kind, fork_id, reputation } in persisted_peers {
let NodeRecord { address, tcp_port, udp_port, id } = record;
peers.entry(id).or_insert_with(|| {
let mut peer = Peer::with_kind(
PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)),
kind,
);
peer.fork_id = fork_id.map(Box::new);
peer.reputation = reputation;
peer
});
}
for NodeRecord { address, tcp_port, udp_port, id } in basic_nodes {
peers.entry(id).or_insert_with(|| {
Peer::new(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
@@ -191,7 +206,7 @@ impl PeersManager {
self.peers.len()
}
/// Returns an iterator over all peers
/// Returns an iterator over all peers as [`NodeRecord`]s.
pub(crate) fn iter_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
self.peers.iter().map(|(peer_id, v)| {
NodeRecord::new_with_ports(
@@ -203,6 +218,26 @@ impl PeersManager {
})
}
/// Returns an iterator over peers suitable for persisting to disk.
///
/// Filters out backed-off and banned peers, and includes metadata like kind, fork ID, and
/// reputation.
pub(crate) fn persistable_peers(&self) -> impl Iterator<Item = PersistedPeerInfo> + '_ {
self.peers.iter().filter(|(_, peer)| !peer.is_backed_off() && !peer.is_banned()).map(
|(peer_id, peer)| PersistedPeerInfo {
record: NodeRecord::new_with_ports(
peer.addr.tcp().ip(),
peer.addr.tcp().port(),
peer.addr.udp().map(|addr| addr.port()),
*peer_id,
),
kind: peer.kind,
fork_id: peer.fork_id.as_deref().copied(),
reputation: peer.reputation,
},
)
}
/// Returns the `NodeRecord` and `PeerKind` for the given peer id
pub(crate) fn peer_by_id(&self, peer_id: PeerId) -> Option<(NodeRecord, PeerKind)> {
self.peers.get(&peer_id).map(|v| {
@@ -939,11 +974,14 @@ impl PeersManager {
self.trusted_peer_ids.remove(&peer_id);
}
/// Returns the idle peer with the highest reputation.
/// Returns the best idle peer to connect to.
///
/// Peers that are `trusted` or `static`, see [`PeerKind`], are prioritized as long as they're
/// not currently marked as banned or backed off.
///
/// Among remaining peers, the one with the highest reputation is selected. When reputation is
/// equal, a peer with a discovered `fork_id` is preferred since it indicates a compatible fork.
///
/// If `trusted_nodes_only` is enabled, see [`PeersConfig`], then this will only consider
/// `trusted` peers.
///
@@ -969,9 +1007,15 @@ impl PeersManager {
return Some((*maybe_better.0, maybe_better.1))
}
// otherwise we keep track of the best peer using the reputation
if maybe_better.1.reputation > best_peer.1.reputation {
best_peer = maybe_better;
// prefer higher reputation, break ties by fork_id presence
match maybe_better.1.reputation.cmp(&best_peer.1.reputation) {
std::cmp::Ordering::Greater => best_peer = maybe_better,
std::cmp::Ordering::Equal
if maybe_better.1.fork_id.is_some() && best_peer.1.fork_id.is_none() =>
{
best_peer = maybe_better
}
_ => {}
}
}
Some((*best_peer.0, best_peer.1))
@@ -1294,6 +1338,7 @@ mod tests {
errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError},
DisconnectReason,
};
use reth_ethereum_forks::{ForkHash, ForkId};
use reth_net_banlist::BanList;
use reth_network_api::Direction;
use reth_network_peers::{PeerId, TrustedPeer};
@@ -3238,4 +3283,23 @@ mod tests {
assert!(peers.on_incoming_pending_session(ip2).is_ok());
assert!(peers.on_incoming_pending_session(ip3).is_ok());
}
#[tokio::test]
async fn test_best_unconnected_prefers_fork_id_as_tiebreaker() {
let mut peers = PeersManager::default();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8008);
let fork_id = ForkId { hash: ForkHash([0xaa, 0xbb, 0xcc, 0xdd]), next: 0 };
// add two peers with equal reputation, only one has a fork_id
let no_fork = PeerId::random();
peers.add_peer(no_fork, PeerAddr::from_tcp(addr), None);
let with_fork = PeerId::random();
peers.add_peer(with_fork, PeerAddr::from_tcp(addr), None);
peers.peers.get_mut(&with_fork).unwrap().fork_id = Some(Box::new(fork_id));
let (best_id, _) = peers.best_unconnected().unwrap();
assert_eq!(best_id, with_fork, "fork_id should break tie when reputation is equal");
}
}

View File

@@ -904,7 +904,7 @@ where
// send hashes if any
if let Some(new_pooled_hashes) = pooled {
for hash in new_pooled_hashes.iter_hashes().copied() {
propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
propagated.record(hash, PropagateKind::Hash(peer_id));
// mark transaction as seen by peer
peer.seen_transactions.insert(hash);
}
@@ -916,7 +916,7 @@ where
// send full transactions, if any
if let Some(new_full_transactions) = full {
for tx in &new_full_transactions {
propagated.0.entry(*tx.tx_hash()).or_default().push(PropagateKind::Full(peer_id));
propagated.record(*tx.tx_hash(), PropagateKind::Full(peer_id));
// mark transaction as seen by peer
peer.seen_transactions.insert(*tx.tx_hash());
}
@@ -926,7 +926,7 @@ where
}
// Update propagated transactions metrics
self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
self.metrics.propagated_transactions.increment(propagated.len() as u64);
Some(propagated)
}
@@ -977,7 +977,7 @@ where
}
for hash in new_pooled_hashes.iter_hashes().copied() {
propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
propagated.record(hash, PropagateKind::Hash(peer_id));
}
trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
@@ -986,7 +986,7 @@ where
self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
// Update propagated transactions metrics
self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
self.metrics.propagated_transactions.increment(propagated.len() as u64);
propagated
};
@@ -1057,7 +1057,7 @@ where
.truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
for hash in new_pooled_hashes.iter_hashes().copied() {
propagated.0.entry(hash).or_default().push(PropagateKind::Hash(*peer_id));
propagated.record(hash, PropagateKind::Hash(*peer_id));
// mark transaction as seen by peer
peer.seen_transactions.insert(hash);
}
@@ -1071,11 +1071,7 @@ where
// send full transactions, if any
if let Some(new_full_transactions) = full {
for tx in &new_full_transactions {
propagated
.0
.entry(*tx.tx_hash())
.or_default()
.push(PropagateKind::Full(*peer_id));
propagated.record(*tx.tx_hash(), PropagateKind::Full(*peer_id));
// mark transaction as seen by peer
peer.seen_transactions.insert(*tx.tx_hash());
}
@@ -1088,7 +1084,7 @@ where
}
// Update propagated transactions metrics
self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
self.metrics.propagated_transactions.increment(propagated.len() as u64);
propagated
}
@@ -1236,7 +1232,7 @@ where
msg_builder.push_pooled(pooled_tx);
}
debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.is_empty(), "Broadcasting transaction hashes");
debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.len(), "Broadcasting transaction hashes");
let msg = msg_builder.build();
self.network.send_transactions_hashes(peer_id, msg);
}
@@ -1924,6 +1920,14 @@ impl PooledTransactionsHashesBuilder {
}
}
/// Returns the number of transactions in the builder.
fn len(&self) -> usize {
match self {
Self::Eth66(hashes) => hashes.len(),
Self::Eth68(hashes) => hashes.len(),
}
}
/// Appends all hashes
fn extend<T: SignedTransaction>(
&mut self,
@@ -2903,12 +2907,12 @@ mod tests {
let propagated =
tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
assert_eq!(propagated.0.len(), 2);
let prop_txs = propagated.0.get(eip1559_tx.transaction.hash()).unwrap();
assert_eq!(propagated.len(), 2);
let prop_txs = propagated.get(eip1559_tx.transaction.hash()).unwrap();
assert_eq!(prop_txs.len(), 1);
assert!(prop_txs[0].is_full());
let prop_txs = propagated.0.get(eip4844_tx.transaction.hash()).unwrap();
let prop_txs = propagated.get(eip4844_tx.transaction.hash()).unwrap();
assert_eq!(prop_txs.len(), 1);
assert!(prop_txs[0].is_hash());
@@ -2919,7 +2923,7 @@ mod tests {
// propagate again
let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
assert!(propagated.0.is_empty());
assert!(propagated.is_empty());
}
#[tokio::test]

View File

@@ -9,7 +9,7 @@ use crate::{
NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
};
use alloy_consensus::BlockHeader;
use futures::{stream_select, FutureExt, StreamExt};
use futures::{stream::FusedStream, stream_select, FutureExt, StreamExt};
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_engine_tree::{
chain::{ChainEvent, FromOrchestrator},
@@ -356,7 +356,7 @@ impl EngineNodeLauncher {
}
}
}
payload = built_payloads.select_next_some() => {
payload = built_payloads.select_next_some(), if !built_payloads.is_terminated() => {
if let Some(executed_block) = payload.executed_block() {
debug!(target: "reth::cli", block=?executed_block.recovered_block.num_hash(), "inserting built payload");
orchestrator.handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block.into_executed_payload()).into());

View File

@@ -24,6 +24,8 @@ pub struct ExExLauncher<Node: FullNodeComponents> {
config_container: WithConfigs<<Node::Types as NodeTypes>::ChainSpec>,
/// The threshold for the number of blocks in the WAL before emitting a warning.
wal_blocks_warning: usize,
/// The max notification buffer capacity for the ExEx manager.
capacity: usize,
}
impl<Node: FullNodeComponents + Clone> ExExLauncher<Node> {
@@ -40,6 +42,7 @@ impl<Node: FullNodeComponents + Clone> ExExLauncher<Node> {
components,
config_container,
wal_blocks_warning: DEFAULT_WAL_BLOCKS_WARNING,
capacity: DEFAULT_EXEX_MANAGER_CAPACITY,
}
}
@@ -53,6 +56,12 @@ impl<Node: FullNodeComponents + Clone> ExExLauncher<Node> {
self
}
/// Sets the max notification buffer capacity for the [`ExExManager`].
pub const fn with_capacity(mut self, capacity: usize) -> Self {
self.capacity = capacity;
self
}
/// Launches all execution extensions.
///
/// Spawns all extensions and returns the handle to the exex manager if any extensions are
@@ -60,7 +69,8 @@ impl<Node: FullNodeComponents + Clone> ExExLauncher<Node> {
pub async fn launch(
self,
) -> eyre::Result<Option<ExExManagerHandle<PrimitivesTy<Node::Types>>>> {
let Self { head, extensions, components, config_container, wal_blocks_warning } = self;
let Self { head, extensions, components, config_container, wal_blocks_warning, capacity } =
self;
let head = BlockNumHash::new(head.number, head.hash);
if extensions.is_empty() {
@@ -134,7 +144,7 @@ impl<Node: FullNodeComponents + Clone> ExExLauncher<Node> {
let exex_manager = ExExManager::new(
components.provider().clone(),
exex_handles,
DEFAULT_EXEX_MANAGER_CAPACITY,
capacity,
exex_wal,
components.provider().finalized_block_stream(),
)

View File

@@ -1020,7 +1020,13 @@ where
.with_executor(node.task_executor().clone())
.with_evm_config(node.evm_config().clone())
.with_consensus(node.consensus().clone())
.build_with_auth_server(module_config, engine_api, eth_api, engine_events.clone());
.build_with_auth_server(
module_config,
engine_api,
eth_api,
engine_events.clone(),
beacon_engine_handle.clone(),
);
// in dev mode we generate 20 random dev-signer accounts
if config.dev.dev {

View File

@@ -59,7 +59,6 @@ url.workspace = true
ipnet.workspace = true
# io
dirs-next.workspace = true
shellexpand.workspace = true
# obs
tracing.workspace = true

View File

@@ -29,7 +29,6 @@ pub struct DefaultEngineValues {
cross_block_cache_size: usize,
state_root_task_compare_updates: bool,
accept_execution_requests_hash: bool,
multiproof_chunking_enabled: bool,
multiproof_chunk_size: usize,
reserved_cpu_cores: usize,
precompile_cache_disabled: bool,
@@ -111,12 +110,6 @@ impl DefaultEngineValues {
self
}
/// Set whether to enable multiproof chunking by default
pub const fn with_multiproof_chunking_enabled(mut self, v: bool) -> Self {
self.multiproof_chunking_enabled = v;
self
}
/// Set the default multiproof chunk size
pub const fn with_multiproof_chunk_size(mut self, v: usize) -> Self {
self.multiproof_chunk_size = v;
@@ -217,7 +210,6 @@ impl Default for DefaultEngineValues {
cross_block_cache_size: DEFAULT_CROSS_BLOCK_CACHE_SIZE_MB,
state_root_task_compare_updates: false,
accept_execution_requests_hash: false,
multiproof_chunking_enabled: true,
multiproof_chunk_size: DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE,
reserved_cpu_cores: DEFAULT_RESERVED_CPU_CORES,
precompile_cache_disabled: false,
@@ -300,10 +292,6 @@ pub struct EngineArgs {
#[arg(long = "engine.accept-execution-requests-hash", default_value_t = DefaultEngineValues::get_global().accept_execution_requests_hash)]
pub accept_execution_requests_hash: bool,
/// Whether multiproof task should chunk proof targets.
#[arg(long = "engine.multiproof-chunking", default_value_t = DefaultEngineValues::get_global().multiproof_chunking_enabled)]
pub multiproof_chunking_enabled: bool,
/// Multiproof task chunk size for proof targets.
#[arg(long = "engine.multiproof-chunk-size", default_value_t = DefaultEngineValues::get_global().multiproof_chunk_size)]
pub multiproof_chunk_size: usize,
@@ -404,7 +392,6 @@ impl Default for EngineArgs {
cross_block_cache_size,
state_root_task_compare_updates,
accept_execution_requests_hash,
multiproof_chunking_enabled,
multiproof_chunk_size,
reserved_cpu_cores,
precompile_cache_disabled,
@@ -433,7 +420,6 @@ impl Default for EngineArgs {
state_provider_metrics,
cross_block_cache_size,
accept_execution_requests_hash,
multiproof_chunking_enabled,
multiproof_chunk_size,
reserved_cpu_cores,
precompile_cache_enabled: true,
@@ -467,7 +453,6 @@ impl EngineArgs {
.with_state_provider_metrics(self.state_provider_metrics)
.with_always_compare_trie_updates(self.state_root_task_compare_updates)
.with_cross_block_cache_size(self.cross_block_cache_size * 1024 * 1024)
.with_multiproof_chunking_enabled(self.multiproof_chunking_enabled)
.with_multiproof_chunk_size(self.multiproof_chunk_size)
.with_reserved_cpu_cores(self.reserved_cpu_cores)
.without_precompile_cache(self.precompile_cache_disabled)
@@ -521,7 +506,6 @@ mod tests {
cross_block_cache_size: 256,
state_root_task_compare_updates: true,
accept_execution_requests_hash: true,
multiproof_chunking_enabled: true,
multiproof_chunk_size: 512,
reserved_cpu_cores: 4,
precompile_cache_enabled: true,
@@ -553,7 +537,6 @@ mod tests {
"256",
"--engine.state-root-task-compare-updates",
"--engine.accept-execution-requests-hash",
"--engine.multiproof-chunking",
"--engine.multiproof-chunk-size",
"512",
"--engine.reserved-cpu-cores",

View File

@@ -1104,9 +1104,9 @@ mod tests {
let net_cfg = builder.build_with_noop_provider(MAINNET.clone());
// Assert basic_nodes contains our node
// Assert persisted_peers contains our node (legacy format is auto-converted)
let node: NodeRecord = enode.parse().unwrap();
assert!(net_cfg.peers_config.basic_nodes.contains(&node));
assert!(net_cfg.peers_config.persisted_peers.iter().any(|p| p.record == node));
// Cleanup
let _ = fs::remove_file(&peers_file);

View File

@@ -3,7 +3,7 @@
use crate::{args::DatadirArgs, utils::parse_path};
use reth_chainspec::Chain;
use std::{
env::VarError,
convert::Infallible,
fmt::{Debug, Display, Formatter},
path::{Path, PathBuf},
str::FromStr,
@@ -86,8 +86,7 @@ pub trait XdgPath {
/// A wrapper type that either parses a user-given path or defaults to an
/// OS-specific path.
///
/// The [`FromStr`] implementation supports shell expansions and common patterns such as `~` for the
/// home directory.
/// The [`FromStr`] implementation parses a string into a path.
///
/// # Example
///
@@ -127,10 +126,10 @@ impl<D: XdgPath> Default for PlatformPath<D> {
}
impl<D> FromStr for PlatformPath<D> {
type Err = shellexpand::LookupError<VarError>;
type Err = Infallible;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self(parse_path(s)?, std::marker::PhantomData))
Ok(Self(parse_path(s), std::marker::PhantomData))
}
}
@@ -235,7 +234,7 @@ impl<D> Default for MaybePlatformPath<D> {
}
impl<D> FromStr for MaybePlatformPath<D> {
type Err = shellexpand::LookupError<VarError>;
type Err = Infallible;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let p = match s {

View File

@@ -10,16 +10,12 @@ use reth_network_p2p::{
bodies::client::BodiesClient, headers::client::HeadersClient, priority::Priority,
};
use reth_primitives_traits::{Block, SealedBlock, SealedHeader};
use std::{
env::VarError,
path::{Path, PathBuf},
};
use std::path::{Path, PathBuf};
use tracing::{debug, info};
/// Parses a user-specified path with support for environment variables and common shorthands (e.g.
/// ~ for the user's home directory).
pub fn parse_path(value: &str) -> Result<PathBuf, shellexpand::LookupError<VarError>> {
shellexpand::full(value).map(|path| PathBuf::from(path.into_owned()))
/// Parses a user-specified path into a [`PathBuf`].
pub fn parse_path(value: &str) -> PathBuf {
PathBuf::from(value)
}
/// Attempts to retrieve or create a JWT secret from the specified path.

View File

@@ -168,7 +168,7 @@ pub use alloy_primitives::{logs_bloom, Log, LogData};
pub mod proofs;
mod storage;
pub use storage::{StorageEntry, StorageSlotKey, ValueWithSubKey};
pub use storage::{StorageEntry, ValueWithSubKey};
pub mod sync;

View File

@@ -145,9 +145,10 @@ impl<T: RlpBincode + 'static> SerdeBincodeCompat for T {
mod block_bincode {
use crate::serde_bincode_compat::SerdeBincodeCompat;
use alloc::{borrow::Cow, vec::Vec};
use alloy_consensus::TxEip4844;
use alloy_consensus::TxTy;
use alloy_eips::eip4895::Withdrawals;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use core::fmt::Debug;
use serde::{de::DeserializeOwned, Deserialize, Deserializer, Serialize, Serializer};
use serde_with::{DeserializeAs, SerializeAs};
/// Bincode-compatible [`alloy_consensus::Block`] serde implementation.
@@ -319,9 +320,11 @@ mod block_bincode {
}
}
impl super::SerdeBincodeCompat for alloy_consensus::EthereumTxEnvelope<TxEip4844> {
impl<T: Clone + Serialize + DeserializeOwned + Debug + 'static> super::SerdeBincodeCompat
for alloy_consensus::EthereumTxEnvelope<T>
{
type BincodeRepr<'a> =
alloy_consensus::serde_bincode_compat::transaction::EthereumTxEnvelope<'a>;
alloy_consensus::serde_bincode_compat::transaction::EthereumTxEnvelope<'a, T>;
fn as_repr(&self) -> Self::BincodeRepr<'_> {
self.into()
@@ -346,8 +349,10 @@ mod block_bincode {
}
}
impl super::SerdeBincodeCompat for alloy_consensus::EthereumReceipt {
type BincodeRepr<'a> = alloy_consensus::serde_bincode_compat::EthereumReceipt<'a>;
impl<T: TxTy + Serialize + DeserializeOwned> super::SerdeBincodeCompat
for alloy_consensus::EthereumReceipt<T>
{
type BincodeRepr<'a> = alloy_consensus::serde_bincode_compat::EthereumReceipt<'a, T>;
fn as_repr(&self) -> Self::BincodeRepr<'_> {
self.into()

View File

@@ -1,4 +1,4 @@
use alloy_primitives::{keccak256, B256, U256};
use alloy_primitives::{B256, U256};
/// Trait for `DupSort` table values that contain a subkey.
///
@@ -12,117 +12,6 @@ pub trait ValueWithSubKey {
fn get_subkey(&self) -> Self::SubKey;
}
/// A storage slot key that tracks whether it holds a plain (unhashed) EVM slot
/// or a keccak256-hashed slot.
///
/// This enum replaces the `use_hashed_state: bool` parameter pattern by carrying
/// provenance with the key itself. Once tagged at a read/write boundary, downstream
/// code can call [`Self::to_hashed`] without risk of double-hashing — hashing a
/// [`StorageSlotKey::Hashed`] is a no-op.
///
/// The on-disk encoding is unchanged (raw 32-byte [`B256`]). The variant is set
/// by the code that knows the context (which table, which storage mode).
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum StorageSlotKey {
/// An unhashed EVM storage slot, as produced by REVM execution.
Plain(B256),
/// A keccak256-hashed storage slot, as stored in `HashedStorages` and
/// in v2-mode `StorageChangeSets`.
Hashed(B256),
}
impl Default for StorageSlotKey {
fn default() -> Self {
Self::Plain(B256::ZERO)
}
}
impl StorageSlotKey {
/// Create a plain slot key from a REVM [`U256`] storage index.
pub const fn from_u256(slot: U256) -> Self {
Self::Plain(B256::new(slot.to_be_bytes()))
}
/// Create a plain slot key from a raw [`B256`].
pub const fn plain(key: B256) -> Self {
Self::Plain(key)
}
/// Create a hashed slot key from a raw [`B256`].
pub const fn hashed(key: B256) -> Self {
Self::Hashed(key)
}
/// Tag a raw [`B256`] based on the storage mode.
///
/// When `use_hashed_state` is true the key is assumed already hashed.
/// When false it is assumed to be a plain slot.
pub const fn from_raw(key: B256, use_hashed_state: bool) -> Self {
if use_hashed_state {
Self::Hashed(key)
} else {
Self::Plain(key)
}
}
/// Returns the raw [`B256`] regardless of variant.
pub const fn as_b256(&self) -> B256 {
match *self {
Self::Plain(b) | Self::Hashed(b) => b,
}
}
/// Returns `true` if this key is already hashed.
pub const fn is_hashed(&self) -> bool {
matches!(self, Self::Hashed(_))
}
/// Returns `true` if this key is plain (unhashed).
pub const fn is_plain(&self) -> bool {
matches!(self, Self::Plain(_))
}
/// Produce the keccak256-hashed form of this slot key.
///
/// - If already [`Hashed`](Self::Hashed), returns the inner value as-is (no double-hash).
/// - If [`Plain`](Self::Plain), applies keccak256 and returns the result.
pub fn to_hashed(&self) -> B256 {
match *self {
Self::Hashed(b) => b,
Self::Plain(b) => keccak256(b),
}
}
/// Convert a plain slot to its changeset representation.
///
/// In v2 mode (`use_hashed_state = true`), the changeset stores hashed keys,
/// so the plain key is hashed. In v1 mode, the plain key is stored as-is.
///
/// Panics (debug) if called on an already-hashed key.
pub fn to_changeset_key(self, use_hashed_state: bool) -> B256 {
debug_assert!(self.is_plain(), "to_changeset_key called on already-hashed key");
if use_hashed_state {
self.to_hashed()
} else {
self.as_b256()
}
}
/// Like [`to_changeset_key`](Self::to_changeset_key) but returns a tagged
/// [`StorageSlotKey`] instead of a raw [`B256`].
///
/// Panics (debug) if called on an already-hashed key.
pub fn to_changeset(self, use_hashed_state: bool) -> Self {
Self::from_raw(self.to_changeset_key(use_hashed_state), use_hashed_state)
}
}
impl From<StorageSlotKey> for B256 {
fn from(key: StorageSlotKey) -> Self {
key.as_b256()
}
}
/// Account storage entry.
///
/// `key` is the subkey when used as a value in the `StorageChangeSets` table.
@@ -142,14 +31,6 @@ impl StorageEntry {
pub const fn new(key: B256, value: U256) -> Self {
Self { key, value }
}
/// Tag this entry's key as a [`StorageSlotKey`] based on the storage mode.
///
/// When `use_hashed_state` is true, the key is tagged as already-hashed.
/// When false, it is tagged as plain.
pub const fn slot_key(&self, use_hashed_state: bool) -> StorageSlotKey {
StorageSlotKey::from_raw(self.key, use_hashed_state)
}
}
impl ValueWithSubKey for StorageEntry {

View File

@@ -6,106 +6,21 @@ homepage.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
description = "Commonly used types in reth."
description = "Deprecated: use reth-ethereum-primitives and reth-primitives-traits instead."
[lints]
workspace = true
[dependencies]
# reth
reth-ethereum-primitives = { workspace = true, features = ["serde"] }
reth-primitives-traits.workspace = true
reth-ethereum-forks.workspace = true
reth-static-file-types.workspace = true
# ethereum
alloy-consensus.workspace = true
alloy-primitives = { workspace = true, optional = true }
alloy-rlp = { workspace = true, optional = true }
alloy-eips = { workspace = true, optional = true }
alloy-genesis = { workspace = true, optional = true }
# for eip-4844
c-kzg = { workspace = true, features = ["serde"], optional = true }
# misc
once_cell.workspace = true
reth-codecs = { workspace = true, optional = true }
[dev-dependencies]
# eth
reth-primitives-traits = { workspace = true, features = ["arbitrary", "test-utils"] }
alloy-eips = { workspace = true, features = ["arbitrary"] }
arbitrary = { workspace = true, features = ["derive"] }
proptest-arbitrary-interop.workspace = true
proptest.workspace = true
criterion.workspace = true
[features]
default = ["c-kzg", "alloy-compat", "std", "reth-codec", "secp256k1"]
std = [
"reth-primitives-traits/std",
"alloy-consensus/std",
"alloy-eips/std",
"alloy-genesis/std",
"once_cell/std",
"reth-ethereum-forks/std",
"reth-ethereum-primitives/std",
"alloy-rlp/std",
"alloy-primitives/std",
"reth-static-file-types/std",
]
reth-codec = [
"std",
"reth-primitives-traits/reth-codec",
"reth-ethereum-primitives/reth-codec",
]
asm-keccak = [
"alloy-primitives/asm-keccak",
]
arbitrary = [
"alloy-eips/arbitrary",
"reth-codec",
"reth-ethereum-forks/arbitrary",
"reth-primitives-traits/arbitrary",
"alloy-consensus/arbitrary",
"reth-ethereum-primitives/arbitrary",
"reth-codecs/arbitrary",
"alloy-primitives/arbitrary",
"c-kzg?/arbitrary",
]
secp256k1 = [
"reth-primitives-traits/secp256k1",
]
c-kzg = [
"dep:c-kzg",
"alloy-consensus/kzg",
"alloy-eips/kzg",
]
# Internal feature to suppress the deprecation warning for reth workspace crates.
__internal = []
std = []
reth-codec = []
asm-keccak = []
arbitrary = []
secp256k1 = []
c-kzg = []
alloy-compat = []
test-utils = [
"reth-primitives-traits/test-utils",
"arbitrary",
"reth-codecs/test-utils",
"reth-ethereum-primitives/test-utils",
]
serde-bincode-compat = [
"alloy-eips/serde-bincode-compat",
"alloy-consensus/serde-bincode-compat",
"reth-primitives-traits/serde-bincode-compat",
"reth-ethereum-primitives/serde-bincode-compat",
"alloy-genesis/serde-bincode-compat",
]
[[bench]]
name = "recover_ecdsa_crit"
harness = false
[[bench]]
name = "validate_blob_tx"
required-features = ["arbitrary", "c-kzg"]
harness = false
test-utils = []
serde-bincode-compat = []

View File

@@ -1,26 +0,0 @@
#![allow(missing_docs)]
use alloy_consensus::transaction::SignerRecoverable;
use alloy_primitives::hex_literal::hex;
use alloy_rlp::Decodable;
use criterion::{criterion_group, criterion_main, Criterion};
use reth_ethereum_primitives::TransactionSigned;
/// Benchmarks the recovery of the public key from the ECDSA message using criterion.
pub fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("recover ECDSA", |b| {
b.iter(|| {
let raw =hex!("f88b8212b085028fa6ae00830f424094aad593da0c8116ef7d2d594dd6a63241bccfc26c80a48318b64b000000000000000000000000641c5d790f862a58ec7abcfd644c0442e9c201b32aa0a6ef9e170bca5ffb7ac05433b13b7043de667fbb0b4a5e45d3b54fb2d6efcc63a0037ec2c05c3d60c5f5f78244ce0a3859e3a18a36c61efb061b383507d3ce19d2");
let mut pointer = raw.as_ref();
let tx = TransactionSigned::decode(&mut pointer).unwrap();
SignerRecoverable::recover_signer(&tx).unwrap();
}
)
});
}
criterion_group! {
name = benches;
config = Criterion::default();
targets = criterion_benchmark
}
criterion_main!(benches);

View File

@@ -1,70 +0,0 @@
#![allow(missing_docs)]
use alloy_consensus::TxEip4844;
use alloy_eips::eip4844::{
env_settings::EnvKzgSettings, BlobTransactionSidecar, MAX_BLOBS_PER_BLOCK_DENCUN,
};
use criterion::{
criterion_group, criterion_main, measurement::WallTime, BenchmarkGroup, Criterion,
};
use proptest::{prelude::*, strategy::ValueTree, test_runner::TestRunner};
use proptest_arbitrary_interop::arb;
/// Benchmarks EIP-4844 blob validation.
fn blob_validation(c: &mut Criterion) {
let mut group = c.benchmark_group("Blob Transaction KZG validation");
for num_blobs in 1..=MAX_BLOBS_PER_BLOCK_DENCUN {
println!("Benchmarking validation for tx with {num_blobs} blobs");
validate_blob_tx(&mut group, "ValidateBlob", num_blobs as u64, EnvKzgSettings::Default);
}
}
fn validate_blob_tx(
group: &mut BenchmarkGroup<'_, WallTime>,
description: &str,
num_blobs: u64,
kzg_settings: EnvKzgSettings,
) {
let setup = || {
let mut runner = TestRunner::deterministic();
// generate tx and sidecar
let mut tx = arb::<TxEip4844>().new_tree(&mut runner).unwrap().current();
let mut blob_sidecar =
arb::<BlobTransactionSidecar>().new_tree(&mut runner).unwrap().current();
while blob_sidecar.blobs.len() < num_blobs as usize {
let blob_sidecar_ext =
arb::<BlobTransactionSidecar>().new_tree(&mut runner).unwrap().current();
// extend the sidecar with the new blobs
blob_sidecar.blobs.extend(blob_sidecar_ext.blobs);
blob_sidecar.proofs.extend(blob_sidecar_ext.proofs);
blob_sidecar.commitments.extend(blob_sidecar_ext.commitments);
}
// ensure exactly num_blobs blobs
blob_sidecar.blobs.truncate(num_blobs as usize);
blob_sidecar.proofs.truncate(num_blobs as usize);
blob_sidecar.commitments.truncate(num_blobs as usize);
tx.blob_versioned_hashes = blob_sidecar.versioned_hashes().collect();
(tx, blob_sidecar)
};
let group_id = format!("validate_blob | num blobs: {num_blobs} | {description}");
let kzg_settings = kzg_settings.get();
// for now we just use the default SubPoolLimit
group.bench_function(group_id, |b| {
b.iter_with_setup(setup, |(tx, blob_sidecar)| {
let r = tx.validate_blob(&blob_sidecar, kzg_settings);
(r, tx, blob_sidecar)
});
});
}
criterion_group!(validate_blob, blob_validation);
criterion_main!(validate_blob);

View File

@@ -1,29 +0,0 @@
use alloy_consensus::Header;
use reth_ethereum_primitives::TransactionSigned;
#[cfg(any(test, feature = "arbitrary"))]
pub use reth_primitives_traits::test_utils::{generate_valid_header, valid_header_strategy};
/// Ethereum full block.
///
/// Withdrawals can be optionally included at the end of the RLP encoded message.
pub type Block<T = TransactionSigned, H = Header> = alloy_consensus::Block<T, H>;
/// A response to `GetBlockBodies`, containing bodies if any bodies were found.
///
/// Withdrawals can be optionally included at the end of the RLP encoded message.
pub type BlockBody<T = TransactionSigned, H = Header> = alloy_consensus::BlockBody<T, H>;
/// Ethereum sealed block type
pub type SealedBlock<B = Block> = reth_primitives_traits::block::SealedBlock<B>;
/// Helper type for constructing the block
#[deprecated(note = "Use `SealedBlock` instead")]
pub type SealedBlockFor<B = Block> = reth_primitives_traits::block::SealedBlock<B>;
/// Ethereum recovered block
#[deprecated(note = "Use `RecoveredBlock` instead")]
pub type BlockWithSenders<B = Block> = reth_primitives_traits::block::RecoveredBlock<B>;
/// Ethereum recovered block
#[deprecated(note = "Use `RecoveredBlock` instead")]
pub type SealedBlockWithSenders<B = Block> = reth_primitives_traits::block::RecoveredBlock<B>;

View File

@@ -1,86 +1,20 @@
//! Commonly used types in Reth.
//!
//! This crate contains Ethereum primitive types and helper functions.
//! ## Deprecation Notice
//!
//! ## Feature Flags
//!
//! - `arbitrary`: Adds `proptest` and `arbitrary` support for primitive types.
//! - `test-utils`: Export utilities for testing
//! - `reth-codec`: Enables db codec support for reth types including zstd compression for certain
//! types.
//! This crate is deprecated and will be removed in a future release.
//! Use [`reth-ethereum-primitives`](https://crates.io/crates/reth-ethereum-primitives) and
//! [`reth-primitives-traits`](https://crates.io/crates/reth-primitives-traits) instead.
#![cfg_attr(
not(feature = "__internal"),
deprecated(
note = "the `reth-primitives` crate is deprecated, use `reth-ethereum-primitives` and `reth-primitives-traits` instead."
)
)]
#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![cfg_attr(not(feature = "std"), no_std)]
// These are used as optional dependencies solely for feature forwarding.
#[cfg(feature = "alloy-eips")]
use alloy_eips as _;
#[cfg(feature = "alloy-genesis")]
use alloy_genesis as _;
#[cfg(feature = "alloy-primitives")]
use alloy_primitives as _;
#[cfg(feature = "alloy-rlp")]
use alloy_rlp as _;
#[cfg(feature = "reth-codecs")]
use reth_codecs as _;
mod block;
mod receipt;
pub use reth_static_file_types as static_file;
pub mod transaction;
#[cfg(any(test, feature = "arbitrary"))]
pub use block::{generate_valid_header, valid_header_strategy};
pub use block::{Block, BlockBody, SealedBlock};
#[expect(deprecated)]
pub use block::{BlockWithSenders, SealedBlockFor, SealedBlockWithSenders};
pub use receipt::{gas_spent_by_transactions, Receipt};
pub use reth_primitives_traits::{
logs_bloom, Account, BlockTy, BodyTy, Bytecode, GotExpected, GotExpectedBoxed, Header,
HeaderTy, Log, LogData, NodePrimitives, ReceiptTy, RecoveredBlock, SealedHeader, StorageEntry,
TxTy,
};
pub use static_file::StaticFileSegment;
pub use alloy_consensus::{
transaction::{PooledTransaction, Recovered, TransactionMeta},
ReceiptWithBloom,
};
/// Recovered transaction
#[deprecated(note = "use `Recovered` instead")]
pub type RecoveredTx<T> = Recovered<T>;
pub use transaction::{
util::secp256k1::{public_key_to_address, recover_signer_unchecked, sign_message},
InvalidTransactionError, Transaction, TransactionSigned, TxType,
};
#[expect(deprecated)]
pub use transaction::{PooledTransactionsElementEcRecovered, TransactionSignedEcRecovered};
// Re-exports
pub use reth_ethereum_forks::*;
#[cfg(feature = "c-kzg")]
pub use c_kzg as kzg;
/// Bincode-compatible serde implementations for commonly used types in Reth.
///
/// `bincode` crate doesn't work with optionally serializable serde fields, but some of the
/// Reth types require optional serialization for RPC compatibility. This module makes so that
/// all fields are serialized.
///
/// Read more: <https://github.com/bincode-org/bincode/issues/326>
#[cfg(feature = "serde-bincode-compat")]
pub mod serde_bincode_compat {
pub use reth_primitives_traits::serde_bincode_compat::*;
}
// Re-export of `EthPrimitives`
pub use reth_ethereum_primitives::EthPrimitives;

View File

@@ -1,5 +0,0 @@
/// Retrieves gas spent by transactions as a vector of tuples (transaction index, gas used).
pub use reth_primitives_traits::receipt::gas_spent_by_transactions;
/// Receipt containing result of transaction execution.
pub use reth_ethereum_primitives::Receipt;

View File

@@ -1,34 +0,0 @@
//! Transaction types.
use crate::Recovered;
pub use alloy_consensus::transaction::PooledTransaction;
use once_cell as _;
#[expect(deprecated)]
pub use pooled::PooledTransactionsElementEcRecovered;
pub use reth_primitives_traits::{
sync::{LazyLock, OnceLock},
transaction::{
error::{
InvalidTransactionError, TransactionConversionError, TryFromRecoveredTransactionError,
},
signed::SignedTransaction,
},
FillTxEnv, WithEncoded,
};
pub use signature::{recover_signer, recover_signer_unchecked};
pub use tx_type::TxType;
/// Handling transaction signature operations, including signature recovery,
/// applying chain IDs, and EIP-2 validation.
pub mod signature;
pub mod util;
mod pooled;
mod tx_type;
/// Signed transaction.
pub use reth_ethereum_primitives::{Transaction, TransactionSigned};
/// Type alias kept for backward compatibility.
#[deprecated(note = "Use `Recovered` instead")]
pub type TransactionSignedEcRecovered<T = TransactionSigned> = Recovered<T>;

View File

@@ -1,9 +0,0 @@
//! Defines the types for blob transactions, legacy, and other EIP-2718 transactions included in a
//! response to `GetPooledTransactions`.
use crate::Recovered;
use alloy_consensus::transaction::PooledTransaction;
/// A signed pooled transaction with recovered signer.
#[deprecated(note = "use `Recovered` instead")]
pub type PooledTransactionsElementEcRecovered<T = PooledTransaction> = Recovered<T>;

View File

@@ -1 +0,0 @@
pub use reth_primitives_traits::crypto::secp256k1::{recover_signer, recover_signer_unchecked};

View File

@@ -1,8 +0,0 @@
/// Transaction Type
///
/// Currently being used as 2-bit type when encoding it to `reth_codecs::Compact` on
/// [`crate::TransactionSigned`]. Adding more transaction types will break the codec and
/// database format.
///
/// Other required changes when adding a new type can be seen on [PR#3953](https://github.com/paradigmxyz/reth/pull/3953/files).
pub use alloy_consensus::TxType;

View File

@@ -1,3 +0,0 @@
//! Utility functions for signature.
pub use reth_primitives_traits::crypto::*;

View File

@@ -135,7 +135,7 @@ impl StorageHistory {
let (block_address, entry) = result?;
let block_number = block_address.block_number();
let address = block_address.address();
highest_deleted_storages.insert((address, entry.key.as_b256()), block_number);
highest_deleted_storages.insert((address, entry.key), block_number);
last_changeset_pruned_block = Some(block_number);
pruned_changesets += 1;
limiter.increment_deleted_entries_count();
@@ -273,7 +273,7 @@ impl StorageHistory {
let (block_address, entry) = result?;
let block_number = block_address.block_number();
let address = block_address.address();
highest_deleted_storages.insert((address, entry.key.as_b256()), block_number);
highest_deleted_storages.insert((address, entry.key), block_number);
last_changeset_pruned_block = Some(block_number);
changesets_processed += 1;
limiter.increment_deleted_entries_count();

View File

@@ -160,14 +160,6 @@ impl StateProvider for StateProviderTest {
) -> ProviderResult<Option<alloy_primitives::StorageValue>> {
Ok(self.accounts.get(&account).and_then(|(storage, _)| storage.get(&storage_key).copied()))
}
fn storage_by_hashed_key(
&self,
_address: Address,
_hashed_storage_key: StorageKey,
) -> ProviderResult<Option<alloy_primitives::StorageValue>> {
Ok(None)
}
}
impl BytecodeReader for StateProviderTest {

View File

@@ -48,7 +48,7 @@ pub mod servers {
net::NetApiServer,
otterscan::OtterscanServer,
reth::RethApiServer,
reth_engine::{RethEngineApiServer, RethPayloadStatus},
reth_engine::{RethEngineApiServer, RethNewPayloadInput, RethPayloadStatus},
rpc::RpcApiServer,
testing::TestingApiServer,
trace::TraceApiServer,

View File

@@ -1,5 +1,5 @@
use alloy_eips::BlockId;
use alloy_primitives::{map::AddressMap, U256};
use alloy_primitives::{map::AddressMap, U256, U64};
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
// Required for the subscription attributes below
@@ -16,6 +16,18 @@ pub trait RethApi {
block_id: BlockId,
) -> RpcResult<AddressMap<U256>>;
/// Re-executes a block (or a range of blocks) and returns the execution outcome including
/// receipts, state changes, and EIP-7685 requests.
///
/// If `count` is provided, re-executes `count` consecutive blocks starting from `block_id`
/// and returns the merged execution outcome.
#[method(name = "getBlockExecutionOutcome")]
async fn reth_get_block_execution_outcome(
&self,
block_id: BlockId,
count: Option<U64>,
) -> RpcResult<Option<serde_json::Value>>;
/// Subscribe to json `ChainNotifications`
#[subscription(
name = "subscribeChainNotifications",

View File

@@ -1,6 +1,7 @@
//! Reth-specific engine API extensions.
use alloy_rpc_types_engine::{ExecutionData, PayloadStatus};
use alloy_primitives::Bytes;
use alloy_rpc_types_engine::{ForkchoiceState, ForkchoiceUpdated, PayloadStatus};
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
use serde::{Deserialize, Serialize};
@@ -22,18 +23,42 @@ pub struct RethPayloadStatus {
pub sparse_trie_wait_us: u64,
}
/// Input for `reth_newPayload` that accepts either `ExecutionData` directly or an RLP-encoded
/// block.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum RethNewPayloadInput<ExecutionData> {
/// Standard execution data (payload + sidecar).
ExecutionData(ExecutionData),
/// An RLP-encoded block.
BlockRlp(Bytes),
}
/// Reth-specific engine API extensions.
///
/// This trait provides a `reth_newPayload` endpoint that takes `ExecutionData` directly
/// (payload + sidecar), waiting for persistence and cache locks before processing.
/// This trait provides a `reth_newPayload` endpoint that accepts either `ExecutionData` directly
/// (payload + sidecar) or an RLP-encoded block, waiting for persistence and cache locks before
/// processing.
///
/// Responses include timing breakdowns with server-measured execution latency.
#[cfg_attr(not(feature = "client"), rpc(server, namespace = "reth"))]
#[cfg_attr(feature = "client", rpc(server, client, namespace = "reth"))]
pub trait RethEngineApi {
/// Reth-specific newPayload that takes `ExecutionData` directly.
pub trait RethEngineApi<ExecutionData> {
/// Reth-specific newPayload that accepts either `ExecutionData` directly or an RLP-encoded
/// block.
///
/// Waits for persistence, execution cache, and sparse trie locks before processing.
#[method(name = "newPayload")]
async fn reth_new_payload(&self, payload: ExecutionData) -> RpcResult<RethPayloadStatus>;
async fn reth_new_payload(
&self,
payload: RethNewPayloadInput<ExecutionData>,
) -> RpcResult<RethPayloadStatus>;
/// Reth-specific forkchoiceUpdated that sends a regular forkchoice update with no payload
/// attributes.
#[method(name = "forkchoiceUpdated")]
async fn reth_forkchoice_updated(
&self,
forkchoice_state: ForkchoiceState,
) -> RpcResult<ForkchoiceUpdated>;
}

View File

@@ -18,6 +18,8 @@ reth-ipc.workspace = true
reth-chainspec.workspace = true
reth-consensus.workspace = true
reth-engine-primitives.workspace = true
reth-rpc-engine-api.workspace = true
reth-payload-primitives.workspace = true
reth-network-api.workspace = true
reth-node-core.workspace = true
reth-rpc.workspace = true

View File

@@ -337,8 +337,15 @@ impl AuthServerHandle {
/// Tell the server to stop without waiting for the server to stop.
pub fn stop(self) -> Result<(), AlreadyStoppedError> {
let Some(handle) = self.handle else { return Ok(()) };
handle.stop()
if let Some(handle) = self.handle {
handle.stop()?;
}
if let Some(ipc_handle) = self.ipc_handle {
ipc_handle.stop()?;
}
Ok(())
}
/// Returns the url to the http server

View File

@@ -32,15 +32,17 @@ use jsonrpsee::{
};
use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
use reth_consensus::FullConsensus;
use reth_engine_primitives::ConsensusEngineEvent;
use reth_engine_primitives::{ConsensusEngineEvent, ConsensusEngineHandle};
use reth_evm::ConfigureEvm;
use reth_network_api::{noop::NoopNetwork, NetworkInfo, Peers};
use reth_payload_primitives::PayloadTypes;
use reth_primitives_traits::{NodePrimitives, TxTy};
use reth_rpc::{
AdminApi, DebugApi, EngineEthApi, EthApi, EthApiBuilder, EthBundle, MinerApi, NetApi,
OtterscanApi, RPCApi, RethApi, TraceApi, TxPoolApi, Web3Api,
};
use reth_rpc_api::servers::*;
use reth_rpc_engine_api::RethEngineApi;
use reth_rpc_eth_api::{
helpers::{
pending_block::PendingEnvBuilder, Call, EthApiSpec, EthTransactions, LoadPendingBlock,
@@ -327,12 +329,13 @@ where
/// This behaves exactly as [`RpcModuleBuilder::build`] for the [`TransportRpcModules`], but
/// also configures the auth (engine api) server, which exposes a subset of the `eth_`
/// namespace.
pub fn build_with_auth_server<EthApi>(
pub fn build_with_auth_server<EthApi, Payload>(
self,
module_config: TransportRpcModuleConfig,
engine: impl IntoEngineApiRpcModule,
eth: EthApi,
engine_events: EventSender<ConsensusEngineEvent<N>>,
beacon_engine_handle: ConsensusEngineHandle<Payload>,
) -> (
TransportRpcModules,
AuthRpcModule,
@@ -340,12 +343,13 @@ where
)
where
EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
Payload: PayloadTypes,
{
let config = module_config.config.clone().unwrap_or_default();
let mut registry = self.into_registry(config, eth, engine_events);
let modules = registry.create_transport_rpc_modules(module_config);
let auth_module = registry.create_auth_module(engine);
let auth_module = registry.create_auth_module(engine, beacon_engine_handle);
(modules, auth_module, registry)
}
@@ -847,8 +851,13 @@ where
}
/// Instantiates `RethApi`
pub fn reth_api(&self) -> RethApi<Provider> {
RethApi::new(self.provider.clone(), self.executor.clone())
pub fn reth_api(&self) -> RethApi<Provider, EvmConfig> {
RethApi::new(
self.provider.clone(),
self.evm_config.clone(),
self.blocking_pool_guard.clone(),
self.executor.clone(),
)
}
}
@@ -870,12 +879,26 @@ where
{
/// Configures the auth module that includes the
/// * `engine_` namespace
/// * `reth_` namespace
/// * `api_` namespace
///
/// Note: This does _not_ register the `engine_` in this registry.
pub fn create_auth_module(&self, engine_api: impl IntoEngineApiRpcModule) -> AuthRpcModule {
pub fn create_auth_module<Payload>(
&self,
engine_api: impl IntoEngineApiRpcModule,
beacon_engine_handle: ConsensusEngineHandle<Payload>,
) -> AuthRpcModule
where
Payload: PayloadTypes,
{
let mut module = engine_api.into_rpc_module();
// Merge reth_* endpoints
let reth_engine_api = RethEngineApi::new(beacon_engine_handle);
module
.merge(RethEngineApiServer::into_rpc(reth_engine_api).remove_context())
.expect("No conflicting methods");
// also merge a subset of `eth_` handlers
let eth_handlers = self.eth_handlers();
let engine_eth = EngineEthApi::new(eth_handlers.api.clone(), eth_handlers.filter.clone());
@@ -1001,11 +1024,14 @@ where
.into_rpc()
.into(),
RethRpcModule::Ots => OtterscanApi::new(eth_api.clone()).into_rpc().into(),
RethRpcModule::Reth => {
RethApi::new(self.provider.clone(), self.executor.clone())
.into_rpc()
.into()
}
RethRpcModule::Reth => RethApi::new(
self.provider.clone(),
self.evm_config.clone(),
self.blocking_pool_guard.clone(),
self.executor.clone(),
)
.into_rpc()
.into(),
RethRpcModule::Miner => MinerApi::default().into_rpc().into(),
RethRpcModule::Mev => {
EthSimBundle::new(eth_api.clone(), self.blocking_pool_guard.clone())

Some files were not shown because too many files have changed in this diff Show More