Compare commits

..

37 Commits

Author SHA1 Message Date
rakita
7b2c458302 feat(pool): derive EIP-8037 CPSB from block gas limit in ensure_intrinsic_gas
Thread the current block gas limit through ensure_intrinsic_gas and
compute cost_per_state_byte via revm_primitives::eip8037 instead of
hard-coding 0. Pool callers already have block_gas_limit on hand.
2026-04-27 08:41:33 +02:00
rakita
0722202930 chore: integrate revm devnet4 + paired forks (rev fe2549d8)
- revm: fe2549d85fb9e201e7b629f8b47bcca46d49aa1d
- revm-inspectors: a2c7a41977b468d016a339f560acb76e002766f3
- alloy-evm: da7633f6bc9554f5a6e60773ef21b8e9d6e0cca6

Adapt to revm Account: original_info is now a private
Option<Box<AccountInfo>>; build accounts via Account::default()
in the engine tree payload processor test fixture.
2026-04-27 00:44:33 +02:00
rakita
8ec6e614f9 chore: integrate revm devnet4 + paired forks (rev 7a2de5a4)
Patch revm to devnet4 (EIP-8037 dynamic CPSB, EIP-7981 access list cost
increase, EIP-7976 calldata floor cost bump, EIP-8037 reservoir refill)
along with the corresponding devnet4 commits of revm-inspectors,
alloy-evm, and reth-core.

Pin revm/revm-inspectors workspace deps to exact `=X.Y.Z` versions so
`[patch.crates-io]` reliably wins over the slightly higher published
versions (e.g. 13.0.1) currently on crates.io.

Pass cpsb=0 to `calculate_initial_tx_gas` from the txpool validator —
the new EIP-8037 storage-cap argument is irrelevant pre-Amsterdam, and
the pool fork tracker tops out at Prague.
2026-04-26 22:45:06 +02:00
Karl Yu
2c86c0b876 feat(network): add BAL request e2e coverage (#23727) 2026-04-26 18:38:30 +00:00
CPerezz
bd4cd28a8d fix(cli): avoid u64 underflow in setup_without_evm for genesis-block header (#23728) 2026-04-26 14:22:45 +00:00
Karl Yu
6fa48a497a feat(net): enforce BAL response soft limit (#23725) 2026-04-26 05:29:28 +00:00
Arsenii Kulikov
6886cd7742 feat(re-execute): verify reverts against changesets (#23717) 2026-04-25 16:46:35 +00:00
Karl Yu
eeb223f0b8 feat(net): add Basic in-memory BAL store (#23710) 2026-04-25 11:45:29 +00:00
Alexey Shekhirin
f344f5abfb bench: enable keccak-cache-global feature in reth-bb binary (#23723) 2026-04-25 11:19:23 +00:00
JOJO
68845d1114 fix(rpc): include block numbers in BlockRangeExceedsHead error (#23720) 2026-04-25 05:44:50 +00:00
Brian Picciano
ecfb6cc089 fix(ci): clean bench checkouts and lock cargo builds (#23708)
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-04-25 05:39:38 +00:00
Alexey Shekhirin
b271694301 perf(revm): enable p256-aws-lc-rs feature (#23721) 2026-04-24 20:36:22 +00:00
romanbrodetski-ai
41c68729ab fix(discv5): use Weak reference in kbuckets bg task to release port on shutdown (#23282)
Co-authored-by: romanbrodetski-ai <romanbrodetski-ai@users.noreply.github.com>
2026-04-24 16:58:03 +00:00
Arsenii Kulikov
79578e35b8 feat: avoid RLP-decoding NewBlock payloads (#23712) 2026-04-24 16:04:29 +00:00
Ishika Choudhury
e4f14b2ae1 chore: added empty request check to storage values (#23714) 2026-04-24 16:01:30 +00:00
Matthias Seitz
05e6da66e1 chore(engine): log transient invalid header cache skips (#23711) 2026-04-24 13:22:35 +00:00
Matthias Seitz
6be5520e34 fix(net): respect peer requirements for fetch followups (#23706) 2026-04-24 11:01:24 +00:00
Brian Picciano
d29db3b765 feat(bench): add reorg mode to new-payload-fcu (#23666)
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-04-24 10:51:40 +00:00
Matthias Seitz
40c30dbc73 chore(db): derive Eq for IntegerList (#23709) 2026-04-24 12:45:05 +02:00
Ishika Choudhury
5c383818a6 chore: reth core bumped to v0.3.1 (#23707)
Co-authored-by: Soubhik Singha Mahapatra <soubhiksmp2004@gmail.com>
2026-04-24 10:23:09 +00:00
Karl Yu
cf6ffb1599 feat(net): add BAL requirement to block access list requests (#23682)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-04-24 08:38:37 +00:00
Matthias Seitz
ba3cd2872a fix(net): retain active session buffer capacity (#23702) 2026-04-24 07:23:22 +00:00
JOJO
4f9af7c16a fix(cli): preserve trusted_nodes_only from config when --trusted-only is not set (#23703) 2026-04-24 07:20:11 +00:00
Veronica Hayes
13c5504aa2 fix(cli): use node types in execution stage dump (#23705) 2026-04-24 07:13:25 +00:00
Arsenii Kulikov
fa6b44b038 perf(re-execute): configurable rocksdb block cache size and re-use of mdbx provider (#23701)
Co-authored-by: Amp <amp@ampcode.com>
2026-04-23 23:03:43 +00:00
Brian Picciano
6377a957c1 refactor(provider): use overlay builders in historical state paths (#23667)
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-04-23 19:05:55 +00:00
Ishika Choudhury
378d4052ee chore(rpc): pass block timestamp to txn (#23700)
Co-authored-by: Soubhik Singha Mahapatra <soubhiksmp2004@gmail.com>
2026-04-23 20:48:04 +02:00
Emma Jamieson-Hoare
62d99888d2 fix(db): move unix deps section after strum in Cargo.toml (#23697)
Co-authored-by: Amp <amp@ampcode.com>
2026-04-23 14:08:15 +00:00
dependabot[bot]
73f5d77b51 chore(deps): bump actions/setup-python from 5 to 6 (#23689)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-04-23 11:42:47 +00:00
figtracer
b62f71977a fix(era): align ERA1 export with spec (#23693) 2026-04-23 11:09:13 +00:00
JOJO
ad27be67be fix(net): track unknown tx types in announcement metrics (#23688)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-04-23 10:44:22 +00:00
Veronica Hayes
63f80907cc fix(cli): use TxTy and ReceiptTy for static-file db get (#23692) 2026-04-23 10:29:00 +00:00
Soubhik Singha Mahapatra
a57930481c chore: add DecodedBal in ExecutionEnv (#23675) 2026-04-23 09:48:50 +00:00
Matthias Seitz
bbcfe354a1 fix(rpc): clean up eth state cache reorg entries (#23683) 2026-04-23 07:03:57 +00:00
Arsenii Kulikov
7839f3d876 perf: avoid reopening .csoff on every changeset lookup (#23687) 2026-04-22 23:24:35 +00:00
Matthias Seitz
e89b4611e4 fix(engine): configure invalid header cache hit eviction (#23670) 2026-04-22 20:20:41 +00:00
Emma Jamieson-Hoare
2b7d4b54d4 feat(p2p): Discv5 is enabled by default (#23686) 2026-04-22 17:49:52 +00:00
108 changed files with 3024 additions and 1229 deletions

View File

@@ -53,7 +53,7 @@ build_node_binary() {
# shellcheck disable=SC2086
RUSTFLAGS="-C target-cpu=native${EXTRA_RUSTFLAGS}" \
cargo build --profile profiling $NODE_PKG $workspace_arg $features_arg
cargo build --locked --profile profiling $NODE_PKG $workspace_arg $features_arg
}
case "$MODE" in

View File

@@ -28,127 +28,6 @@ mkdir -p "$OUTPUT_DIR"
LOG="${OUTPUT_DIR}/node.log"
RETH_SCOPE="${RETH_SCOPE:-reth-bench.scope}"
DELAY_DM_NAME=""
mount_fs() {
local source="$1"
local target="$2"
local fstype="$3"
local options="$4"
local -a mount_cmd=(sudo mount -t "$fstype")
if [ -n "$options" ]; then
mount_cmd+=(-o "$options")
fi
mount_cmd+=("$source" "$target")
"${mount_cmd[@]}"
}
wait_for_block_device() {
local device="$1"
local timeout_s="${2:-5}"
for _ in $(seq 1 "$timeout_s"); do
if [ -b "$device" ]; then
return 0
fi
sleep 1
done
return 1
}
resolve_dm_block_device() {
local device_name="$1"
local devno
devno="$(sudo dmsetup info -C --noheadings --separator : -o major,minor "$device_name" 2>/dev/null | tr -d '[:space:]')"
if [ -z "$devno" ]; then
return 1
fi
printf '/dev/block/%s\n' "$devno"
}
setup_delay_target() {
local read_delay_ms="${BENCH_DISK_READ_DELAY_MS:-0}"
local write_delay_ms="${BENCH_DISK_WRITE_DELAY_MS:-0}"
read_delay_ms="${read_delay_ms:-0}"
write_delay_ms="${write_delay_ms:-0}"
if ! [[ "$read_delay_ms" =~ ^[0-9]+$ && "$write_delay_ms" =~ ^[0-9]+$ ]]; then
echo "::error::BENCH_DISK_READ_DELAY_MS and BENCH_DISK_WRITE_DELAY_MS must be non-negative integers"
exit 1
fi
if [ "$read_delay_ms" = "0" ] && [ "$write_delay_ms" = "0" ]; then
return
fi
local mount_source
local mount_fstype
local mount_options
mount_source="$(findmnt -no SOURCE --target "$SCHELK_MOUNT")"
mount_fstype="$(findmnt -no FSTYPE --target "$SCHELK_MOUNT")"
mount_options="$(findmnt -no OPTIONS --target "$SCHELK_MOUNT")"
local sectors
sectors="$(sudo blockdev --getsz "$mount_source")"
DELAY_DM_NAME="reth-bench-delay-${LABEL}-$$"
local delayed_device=""
local table="0 ${sectors} delay ${mount_source} 0 ${read_delay_ms} ${mount_source} 0 ${write_delay_ms}"
echo "Applying dm-delay to ${mount_source}: read=${read_delay_ms}ms write=${write_delay_ms}ms"
sudo umount "$SCHELK_MOUNT"
if ! sudo dmsetup create "$DELAY_DM_NAME" --addnodeoncreate --table "$table"; then
DELAY_DM_NAME=""
mount_fs "$mount_source" "$SCHELK_MOUNT" "$mount_fstype" "$mount_options"
echo "::error::Failed to create dm-delay target"
exit 1
fi
delayed_device="$(resolve_dm_block_device "$DELAY_DM_NAME" || true)"
if [ -z "$delayed_device" ]; then
local failed_device_name="$DELAY_DM_NAME"
sudo dmsetup remove "$DELAY_DM_NAME" || true
DELAY_DM_NAME=""
mount_fs "$mount_source" "$SCHELK_MOUNT" "$mount_fstype" "$mount_options"
echo "::error::Failed to resolve delayed device node for ${failed_device_name}"
exit 1
fi
if ! wait_for_block_device "$delayed_device"; then
delayed_device="$(resolve_dm_block_device "$DELAY_DM_NAME" || true)"
if ! wait_for_block_device "$delayed_device"; then
sudo dmsetup remove "$DELAY_DM_NAME" || true
DELAY_DM_NAME=""
mount_fs "$mount_source" "$SCHELK_MOUNT" "$mount_fstype" "$mount_options"
echo "::error::Delayed device node did not appear: ${delayed_device}"
exit 1
fi
fi
if ! mount_fs "$delayed_device" "$SCHELK_MOUNT" "$mount_fstype" "$mount_options"; then
sudo dmsetup remove "$DELAY_DM_NAME" || true
DELAY_DM_NAME=""
mount_fs "$mount_source" "$SCHELK_MOUNT" "$mount_fstype" "$mount_options"
echo "::error::Failed to remount delayed device"
exit 1
fi
}
teardown_delay_target() {
if [ -z "$DELAY_DM_NAME" ]; then
return
fi
echo "Removing dm-delay target ${DELAY_DM_NAME}"
if mountpoint -q "$SCHELK_MOUNT"; then
sudo umount "$SCHELK_MOUNT" || true
fi
sudo dmsetup remove "$DELAY_DM_NAME" || sudo dmsetup remove -f "$DELAY_DM_NAME" || true
DELAY_DM_NAME=""
}
cleanup() {
kill "$TAIL_PID" 2>/dev/null || true
@@ -197,7 +76,6 @@ cleanup() {
sudo systemctl reset-failed "$RETH_SCOPE" 2>/dev/null || true
# Fix ownership of reth-created files (reth runs as root)
sudo chown -R "$(id -un):$(id -gn)" "$OUTPUT_DIR" 2>/dev/null || true
teardown_delay_target
# Let schelk recover the mounted volume in place so dm-era can restore only
# the changed blocks and clean up its own state.
sudo schelk recover -y --kill || true
@@ -214,7 +92,6 @@ sudo schelk recover -y --kill || sudo schelk full-recover -y || true
# Mount
sudo schelk mount -y || true
setup_delay_target
if [ ! -d "$DATADIR/db" ] || [ ! -d "$DATADIR/static_files" ]; then
echo "::error::Failed to mount benchmark datadir at ${DATADIR}"
ls -la "$SCHELK_MOUNT" || true

View File

@@ -366,19 +366,24 @@ jobs:
- name: Prepare source dirs
run: |
if [ -d ../reth-baseline ]; then
git -C ../reth-baseline fetch origin "$BASELINE_REF"
else
git clone . ../reth-baseline
fi
git -C ../reth-baseline checkout "$BASELINE_REF"
prepare_source_dir() {
local dir="$1"
local ref="$2"
if [ -d ../reth-feature ]; then
git -C ../reth-feature fetch origin "$FEATURE_REF"
else
git clone . ../reth-feature
fi
git -C ../reth-feature checkout "$FEATURE_REF"
if [ -d "$dir" ]; then
git -C "$dir" reset --hard HEAD
git -C "$dir" clean -fdx
git -C "$dir" fetch origin "$ref"
else
git clone . "$dir"
fi
git -C "$dir" checkout --force "$ref"
}
prepare_source_dir ../reth-baseline "$BASELINE_REF"
prepare_source_dir ../reth-feature "$FEATURE_REF"
- name: Build binaries
id: build

View File

@@ -51,16 +51,6 @@ on:
required: false
default: ""
type: string
disk_read_delay_ms:
description: "Apply dm-delay read latency to the schelk volume (milliseconds)"
required: false
default: ""
type: string
disk_write_delay_ms:
description: "Apply dm-delay write latency to the schelk volume (milliseconds)"
required: false
default: ""
type: string
baseline_args:
description: "Extra CLI args for the baseline reth node"
required: false
@@ -135,8 +125,6 @@ jobs:
big-blocks: ${{ steps.args.outputs.big-blocks }}
bal: ${{ steps.args.outputs.bal }}
wait-time: ${{ steps.args.outputs.wait-time }}
disk-read-delay-ms: ${{ steps.args.outputs.disk-read-delay-ms }}
disk-write-delay-ms: ${{ steps.args.outputs.disk-write-delay-ms }}
baseline-args: ${{ steps.args.outputs.baseline-args }}
feature-args: ${{ steps.args.outputs.feature-args }}
abba: ${{ steps.args.outputs.abba }}
@@ -170,7 +158,7 @@ jobs:
script: |
const validBalModes = new Set(['false', 'true', 'feature', 'baseline']);
const validSlackModes = new Set(['always', 'on-win', 'on-error', 'never']);
const usage = '`@decofe bench [blocks=N] [big-blocks[=true|false]] [bal=true|false|feature|baseline] [warmup=N] [baseline=REF] [feature=REF] [samply] [slack=always|on-win|on-error|never] [cores=N] [abba=true|false] [otlp=true|false] [wait-time=DURATION] [disk-read-delay-ms=N] [disk-write-delay-ms=N] [baseline-args="..."] [feature-args="..."]`';
const usage = '`@decofe bench [blocks=N] [big-blocks[=true|false]] [bal=true|false|feature|baseline] [warmup=N] [baseline=REF] [feature=REF] [samply] [slack=always|on-win|on-error|never] [cores=N] [abba=true|false] [otlp=true|false] [wait-time=DURATION] [baseline-args="..."] [feature-args="..."]`';
let pr, actor, blocks, warmup, baseline, feature, samply, cores, bigBlocks, bal;
let explicitWarmup = false;
@@ -189,8 +177,6 @@ jobs:
var abba = '${{ github.event.inputs.abba }}' !== 'false' ? 'true' : 'false';
var otlp = '${{ github.event.inputs.otlp }}' !== 'false' ? 'true' : 'false';
var waitTime = '${{ github.event.inputs.wait_time }}' || '';
var diskReadDelayMs = '${{ github.event.inputs.disk_read_delay_ms }}' || '';
var diskWriteDelayMs = '${{ github.event.inputs.disk_write_delay_ms }}' || '';
var baselineNodeArgs = '${{ github.event.inputs.baseline_args }}' || '';
var featureNodeArgs = '${{ github.event.inputs.feature_args }}' || '';
@@ -218,9 +204,8 @@ jobs:
const boolDefaultTrue = new Set(['abba', 'otlp']);
const enumArgs = new Map([['bal', validBalModes], ['slack', validSlackModes]]);
const durationArgs = new Set(['wait-time']);
const optionalIntArgs = new Set(['disk-read-delay-ms', 'disk-write-delay-ms']);
const stringArgs = new Set(['baseline-args', 'feature-args']);
const defaults = { blocks: '500', warmup: '200', baseline: '', feature: '', samply: 'false', slack: 'always', 'big-blocks': 'false', bal: 'false', cores: '0', abba: 'true', otlp: 'true', 'wait-time': '', 'disk-read-delay-ms': '', 'disk-write-delay-ms': '', 'baseline-args': '', 'feature-args': '' };
const defaults = { blocks: '500', warmup: '200', baseline: '', feature: '', samply: 'false', slack: 'always', 'big-blocks': 'false', bal: 'false', cores: '0', abba: 'true', otlp: 'true', 'wait-time': '', 'baseline-args': '', 'feature-args': '' };
const unknown = [];
const invalid = [];
const args = body.replace(/^(?:@decofe|derek) bench\s*/, '');
@@ -259,12 +244,6 @@ jobs:
} else {
invalid.push(`\`${key}=${value}\` (must be a duration like 500ms, 1s, 2m)`);
}
} else if (optionalIntArgs.has(key)) {
if (!/^\d+$/.test(value)) {
invalid.push(`\`${key}=${value}\` (must be a non-negative integer in milliseconds)`);
} else {
defaults[key] = value;
}
} else if (enumArgs.has(key)) {
if (enumArgs.get(key).has(value)) {
defaults[key] = value;
@@ -316,8 +295,6 @@ jobs:
var abba = defaults.abba;
var otlp = defaults.otlp;
var waitTime = defaults['wait-time'];
var diskReadDelayMs = defaults['disk-read-delay-ms'];
var diskWriteDelayMs = defaults['disk-write-delay-ms'];
var baselineNodeArgs = defaults['baseline-args'];
var featureNodeArgs = defaults['feature-args'];
}
@@ -375,8 +352,6 @@ jobs:
core.setOutput('big-blocks', bigBlocks);
core.setOutput('bal', bal);
core.setOutput('wait-time', waitTime);
core.setOutput('disk-read-delay-ms', diskReadDelayMs);
core.setOutput('disk-write-delay-ms', diskWriteDelayMs);
core.setOutput('baseline-args', baselineNodeArgs);
core.setOutput('feature-args', featureNodeArgs);
core.setOutput('abba', abba);
@@ -452,16 +427,12 @@ jobs:
const otlpNote = !otlpEnabled ? ', otlp: `disabled`' : '';
const waitTimeVal = '${{ steps.args.outputs.wait-time }}';
const waitTimeNote = waitTimeVal ? `, wait-time: \`${waitTimeVal}\`` : '';
const diskReadDelayVal = '${{ steps.args.outputs.disk-read-delay-ms }}';
const diskReadDelayNote = diskReadDelayVal && diskReadDelayVal !== '0' ? `, disk-read-delay: \`${diskReadDelayVal}ms\`` : '';
const diskWriteDelayVal = '${{ steps.args.outputs.disk-write-delay-ms }}';
const diskWriteDelayNote = diskWriteDelayVal && diskWriteDelayVal !== '0' ? `, disk-write-delay: \`${diskWriteDelayVal}ms\`` : '';
const baselineArgsVal = '${{ steps.args.outputs.baseline-args }}';
const baselineArgsNote = baselineArgsVal ? `, baseline-args: \`${baselineArgsVal}\`` : '';
const featureArgsVal = '${{ steps.args.outputs.feature-args }}';
const featureArgsNote = featureArgsVal ? `, feature-args: \`${featureArgsVal}\`` : '';
const blocksDesc = bigBlocks ? 'blocks: `big`' : `${blocks} blocks, ${warmup} warmup blocks`;
const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${slackNote}${balNote}${coresNote}${abbaNote}${otlpNote}${waitTimeNote}${diskReadDelayNote}${diskWriteDelayNote}${baselineArgsNote}${featureArgsNote}`;
const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${slackNote}${balNote}${coresNote}${abbaNote}${otlpNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`;
const { data: comment } = await github.rest.issues.createComment({
owner: context.repo.owner,
@@ -500,16 +471,12 @@ jobs:
const otlpNote = !otlpEnabled ? ', otlp: `disabled`' : '';
const waitTimeVal = '${{ steps.args.outputs.wait-time }}';
const waitTimeNote = waitTimeVal ? `, wait-time: \`${waitTimeVal}\`` : '';
const diskReadDelayVal = '${{ steps.args.outputs.disk-read-delay-ms }}';
const diskReadDelayNote = diskReadDelayVal && diskReadDelayVal !== '0' ? `, disk-read-delay: \`${diskReadDelayVal}ms\`` : '';
const diskWriteDelayVal = '${{ steps.args.outputs.disk-write-delay-ms }}';
const diskWriteDelayNote = diskWriteDelayVal && diskWriteDelayVal !== '0' ? `, disk-write-delay: \`${diskWriteDelayVal}ms\`` : '';
const baselineArgsVal = '${{ steps.args.outputs.baseline-args }}';
const baselineArgsNote = baselineArgsVal ? `, baseline-args: \`${baselineArgsVal}\`` : '';
const featureArgsVal = '${{ steps.args.outputs.feature-args }}';
const featureArgsNote = featureArgsVal ? `, feature-args: \`${featureArgsVal}\`` : '';
const blocksDesc = bigBlocks ? 'blocks: `big`' : `${blocks} blocks, ${warmup} warmup blocks`;
const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${slackNote}${balNote}${coresNote}${abbaNote}${otlpNote}${waitTimeNote}${diskReadDelayNote}${diskWriteDelayNote}${baselineArgsNote}${featureArgsNote}`;
const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${slackNote}${balNote}${coresNote}${abbaNote}${otlpNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`;
const runUrl = `${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/actions/runs/${context.runId}`;
const numRunners = parseInt(process.env.BENCH_RUNNERS) || 1;
@@ -577,8 +544,6 @@ jobs:
BENCH_BIG_BLOCKS: ${{ needs.reth-bench-ack.outputs.big-blocks }}
BENCH_BAL: ${{ needs.reth-bench-ack.outputs.bal }}
BENCH_WAIT_TIME: ${{ needs.reth-bench-ack.outputs.wait-time }}
BENCH_DISK_READ_DELAY_MS: ${{ needs.reth-bench-ack.outputs.disk-read-delay-ms }}
BENCH_DISK_WRITE_DELAY_MS: ${{ needs.reth-bench-ack.outputs.disk-write-delay-ms }}
BENCH_BASELINE_ARGS: ${{ needs.reth-bench-ack.outputs.baseline-args }}
BENCH_FEATURE_ARGS: ${{ needs.reth-bench-ack.outputs.feature-args }}
BENCH_ABBA: ${{ needs.reth-bench-ack.outputs.abba }}
@@ -653,16 +618,12 @@ jobs:
const otlpNote = !otlpEnabled ? ', otlp: `disabled`' : '';
const waitTimeVal = process.env.BENCH_WAIT_TIME || '';
const waitTimeNote = waitTimeVal ? `, wait-time: \`${waitTimeVal}\`` : '';
const diskReadDelayVal = process.env.BENCH_DISK_READ_DELAY_MS || '';
const diskReadDelayNote = diskReadDelayVal && diskReadDelayVal !== '0' ? `, disk-read-delay: \`${diskReadDelayVal}ms\`` : '';
const diskWriteDelayVal = process.env.BENCH_DISK_WRITE_DELAY_MS || '';
const diskWriteDelayNote = diskWriteDelayVal && diskWriteDelayVal !== '0' ? `, disk-write-delay: \`${diskWriteDelayVal}ms\`` : '';
const baselineArgsVal = process.env.BENCH_BASELINE_ARGS || '';
const baselineArgsNote = baselineArgsVal ? `, baseline-args: \`${baselineArgsVal}\`` : '';
const featureArgsVal = process.env.BENCH_FEATURE_ARGS || '';
const featureArgsNote = featureArgsVal ? `, feature-args: \`${featureArgsVal}\`` : '';
const blocksDesc = bigBlocks ? 'blocks: `big`' : `${blocks} blocks, ${warmup} warmup blocks`;
core.exportVariable('BENCH_CONFIG', `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${slackNote}${balNote}${coresNote}${abbaNote}${otlpNote}${waitTimeNote}${diskReadDelayNote}${diskWriteDelayNote}${baselineArgsNote}${featureArgsNote}`);
core.exportVariable('BENCH_CONFIG', `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${slackNote}${balNote}${coresNote}${abbaNote}${otlpNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`);
const { buildBody } = require('./.github/scripts/bench-update-status.js');
await github.rest.issues.updateComment({
@@ -723,7 +684,7 @@ jobs:
echo "$HOME/.local/bin" >> "$GITHUB_PATH"
echo "$HOME/.cargo/bin" >> "$GITHUB_PATH"
missing=()
for cmd in schelk cpupower taskset stdbuf python3 curl make uv jq dmsetup; do
for cmd in schelk cpupower taskset stdbuf python3 curl make uv jq; do
command -v "$cmd" &>/dev/null || missing+=("$cmd")
done
if [ ${#missing[@]} -gt 0 ]; then
@@ -841,21 +802,26 @@ jobs:
- name: Prepare source dirs
run: |
prepare_source_dir() {
local dir="$1"
local ref="$2"
if [ -d "$dir" ]; then
git -C "$dir" reset --hard HEAD
git -C "$dir" clean -fdx
git -C "$dir" fetch origin "$ref"
else
git clone . "$dir"
fi
git -C "$dir" checkout --force "$ref"
}
BASELINE_REF="${{ steps.refs.outputs.baseline-ref }}"
if [ -d ../reth-baseline ]; then
git -C ../reth-baseline fetch origin "$BASELINE_REF"
else
git clone . ../reth-baseline
fi
git -C ../reth-baseline checkout "$BASELINE_REF"
prepare_source_dir ../reth-baseline "$BASELINE_REF"
FEATURE_REF="${{ steps.refs.outputs.feature-ref }}"
if [ -d ../reth-feature ]; then
git -C ../reth-feature fetch origin "$FEATURE_REF"
else
git clone . ../reth-feature
fi
git -C ../reth-feature checkout "$FEATURE_REF"
prepare_source_dir ../reth-feature "$FEATURE_REF"
- name: Build binaries
id: build

View File

@@ -21,7 +21,7 @@ jobs:
steps:
- uses: actions/checkout@v6
- uses: actions/setup-python@v5
- uses: actions/setup-python@v6
with:
python-version: "3.12"

406
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -326,8 +326,8 @@ reth-cli = { path = "crates/cli/cli" }
reth-cli-commands = { path = "crates/cli/commands" }
reth-cli-runner = { path = "crates/cli/runner" }
reth-cli-util = { path = "crates/cli/util" }
reth-codecs = { version = "0.3.0", default-features = false }
reth-codecs-derive = "0.3.0"
reth-codecs = { version = "0.3.1", default-features = false }
reth-codecs-derive = "0.3.1"
reth-config = { path = "crates/config", default-features = false }
reth-consensus = { path = "crates/consensus/consensus", default-features = false }
reth-consensus-common = { path = "crates/consensus/common", default-features = false }
@@ -395,7 +395,7 @@ reth-payload-builder-primitives = { path = "crates/payload/builder-primitives" }
reth-payload-primitives = { path = "crates/payload/primitives" }
reth-payload-validator = { path = "crates/payload/validator" }
reth-payload-util = { path = "crates/payload/util" }
reth-primitives-traits = { version = "0.3.0", default-features = false }
reth-primitives-traits = { version = "0.3.1", default-features = false }
reth-provider = { path = "crates/storage/provider" }
reth-prune = { path = "crates/prune/prune" }
reth-prune-types = { path = "crates/prune/types", default-features = false }
@@ -411,7 +411,7 @@ reth-rpc-eth-types = { path = "crates/rpc/rpc-eth-types", default-features = fal
reth-rpc-layer = { path = "crates/rpc/rpc-layer" }
reth-rpc-server-types = { path = "crates/rpc/rpc-server-types" }
reth-rpc-convert = { path = "crates/rpc/rpc-convert" }
reth-rpc-traits = { version = "0.3.0", default-features = false }
reth-rpc-traits = { version = "0.3.1", default-features = false }
reth-stages = { path = "crates/stages/stages" }
reth-stages-api = { path = "crates/stages/api" }
reth-stages-types = { path = "crates/stages/types", default-features = false }
@@ -430,17 +430,17 @@ reth-trie-common = { path = "crates/trie/common", default-features = false }
reth-trie-db = { path = "crates/trie/db" }
reth-trie-parallel = { path = "crates/trie/parallel" }
reth-trie-sparse = { path = "crates/trie/sparse", default-features = false }
reth-zstd-compressors = { version = "0.3.0", default-features = false }
reth-zstd-compressors = { version = "0.3.1", default-features = false }
# revm
revm = { version = "38.0.0", default-features = false }
revm-bytecode = { version = "10.0.0", default-features = false }
revm-database = { version = "13.0.0", default-features = false }
revm-state = { version = "11.0.0", default-features = false }
revm-primitives = { version = "23.0.0", default-features = false }
revm-interpreter = { version = "35.0.0", default-features = false }
revm-database-interface = { version = "11.0.0", default-features = false }
revm-inspectors = "0.39.0"
revm = { version = "=37.0.0", default-features = false }
revm-bytecode = { version = "=10.0.0", default-features = false }
revm-database = { version = "=13.0.0", default-features = false }
revm-state = { version = "=11.0.0", default-features = false }
revm-primitives = { version = "=23.0.0", default-features = false }
revm-interpreter = { version = "=35.0.0", default-features = false }
revm-database-interface = { version = "=11.0.0", default-features = false }
revm-inspectors = "=0.39.0"
# eth
alloy-dyn-abi = "1.5.6"
@@ -449,7 +449,7 @@ alloy-sol-types = { version = "1.5.6", default-features = false }
alloy-chains = { version = "0.2.33", default-features = false }
alloy-eip2124 = { version = "0.2.0", default-features = false }
alloy-eip7928 = { version = "0.3.0", default-features = false }
alloy-eip7928 = { version = "0.3.4", default-features = false }
alloy-evm = { version = "0.33.0", default-features = false }
alloy-rlp = { version = "0.3.13", default-features = false, features = ["core-net"] }
alloy-trie = { version = "0.9.4", default-features = false }
@@ -700,3 +700,24 @@ vergen-git2 = "9.1.0"
# networking
ipnet = "2.11"
[patch.crates-io]
revm = { git = "https://github.com/bluealloy/revm", rev = "fe2549d85fb9e201e7b629f8b47bcca46d49aa1d" }
revm-bytecode = { git = "https://github.com/bluealloy/revm", rev = "fe2549d85fb9e201e7b629f8b47bcca46d49aa1d" }
revm-context = { git = "https://github.com/bluealloy/revm", rev = "fe2549d85fb9e201e7b629f8b47bcca46d49aa1d" }
revm-context-interface = { git = "https://github.com/bluealloy/revm", rev = "fe2549d85fb9e201e7b629f8b47bcca46d49aa1d" }
revm-database = { git = "https://github.com/bluealloy/revm", rev = "fe2549d85fb9e201e7b629f8b47bcca46d49aa1d" }
revm-database-interface = { git = "https://github.com/bluealloy/revm", rev = "fe2549d85fb9e201e7b629f8b47bcca46d49aa1d" }
revm-handler = { git = "https://github.com/bluealloy/revm", rev = "fe2549d85fb9e201e7b629f8b47bcca46d49aa1d" }
revm-inspector = { git = "https://github.com/bluealloy/revm", rev = "fe2549d85fb9e201e7b629f8b47bcca46d49aa1d" }
revm-interpreter = { git = "https://github.com/bluealloy/revm", rev = "fe2549d85fb9e201e7b629f8b47bcca46d49aa1d" }
revm-precompile = { git = "https://github.com/bluealloy/revm", rev = "fe2549d85fb9e201e7b629f8b47bcca46d49aa1d" }
revm-primitives = { git = "https://github.com/bluealloy/revm", rev = "fe2549d85fb9e201e7b629f8b47bcca46d49aa1d" }
revm-state = { git = "https://github.com/bluealloy/revm", rev = "fe2549d85fb9e201e7b629f8b47bcca46d49aa1d" }
revm-inspectors = { git = "https://github.com/paradigmxyz/revm-inspectors", rev = "a2c7a41977b468d016a339f560acb76e002766f3" }
alloy-evm = { git = "https://github.com/alloy-rs/evm", rev = "da7633f6bc9554f5a6e60773ef21b8e9d6e0cca6" }
reth-codecs = { git = "https://github.com/paradigmxyz/reth-core", rev = "c763480b9fa51957fbdb69b7caead5dfc4e3752c" }
reth-codecs-derive = { git = "https://github.com/paradigmxyz/reth-core", rev = "c763480b9fa51957fbdb69b7caead5dfc4e3752c" }
reth-primitives-traits = { git = "https://github.com/paradigmxyz/reth-core", rev = "c763480b9fa51957fbdb69b7caead5dfc4e3752c" }
reth-rpc-traits = { git = "https://github.com/paradigmxyz/reth-core", rev = "c763480b9fa51957fbdb69b7caead5dfc4e3752c" }
reth-zstd-compressors = { git = "https://github.com/paradigmxyz/reth-core", rev = "c763480b9fa51957fbdb69b7caead5dfc4e3752c" }

View File

@@ -69,6 +69,7 @@ default = [
"jemalloc",
"reth-cli-util/jemalloc",
"asm-keccak",
"keccak-cache-global",
"min-debug-logs",
]
@@ -89,6 +90,12 @@ asm-keccak = [
"revm-primitives/asm-keccak",
]
keccak-cache-global = [
"reth-node-core/keccak-cache-global",
"reth-node-ethereum/keccak-cache-global",
"alloy-primitives/keccak-cache-global",
]
min-debug-logs = [
"tracing/release_max_level_debug",
"reth-ethereum-cli/min-debug-logs",

View File

@@ -21,6 +21,8 @@ pub(crate) struct BenchContext {
pub(crate) auth_provider: RootProvider<AnyNetwork>,
/// The block provider is used for block queries.
pub(crate) block_provider: RootProvider<AnyNetwork>,
/// The local regular RPC provider is used for non-authenticated node RPCs like `testing_*`.
pub(crate) local_rpc_provider: RootProvider<AnyNetwork>,
/// The benchmark mode, which defines whether the benchmark should run for a closed or open
/// range of blocks.
pub(crate) benchmark_mode: BenchMode,
@@ -83,6 +85,11 @@ impl BenchContext {
let client = ClientBuilder::default().connect_with(auth_transport).await?;
let auth_provider = RootProvider::<AnyNetwork>::new(client);
let local_rpc_url = Url::parse(&bench_args.local_rpc_url)?;
info!(target: "reth-bench", "Connecting to local regular RPC at {} for testing namespace calls", local_rpc_url);
let local_rpc_provider =
RootProvider::<AnyNetwork>::new(ClientBuilder::default().http(local_rpc_url));
// Computes the block range for the benchmark.
//
// - If `--advance` is provided, fetches the latest block from the engine and sets:
@@ -159,6 +166,7 @@ impl BenchContext {
Ok(Self {
auth_provider,
block_provider,
local_rpc_provider,
benchmark_mode,
next_block,
use_reth_namespace,

View File

@@ -14,14 +14,24 @@ use crate::{
block_to_new_payload, call_forkchoice_updated_with_reth, call_new_payload_with_reth,
},
};
use alloy_provider::{ext::DebugApi, Provider};
use alloy_rpc_types_engine::ForkchoiceState;
use alloy_consensus::TxEnvelope;
use alloy_eips::Encodable2718;
use alloy_primitives::B256;
use alloy_provider::{
ext::DebugApi,
network::{AnyNetwork, AnyRpcBlock},
Provider, RootProvider,
};
use alloy_rpc_types_engine::{
ExecutionData, ExecutionPayloadEnvelopeV5, ForkchoiceState, PayloadAttributes,
};
use clap::Parser;
use eyre::{Context, OptionExt};
use eyre::{bail, ensure, Context, OptionExt};
use futures::{stream, StreamExt, TryStreamExt};
use reth_cli_runner::CliContext;
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
use reth_node_core::args::BenchmarkArgs;
use reth_rpc_api::{RethNewPayloadInput, TestingBuildBlockRequestV1};
use std::time::{Duration, Instant};
use tracing::{debug, info, warn};
@@ -32,6 +42,22 @@ pub struct Command {
#[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
rpc_url: String,
/// Build a separate fork with `testing_buildBlockV1` and alternate forkchoice updates between
/// the canonical chain and that fork on every block while the fork grows up to the configured
/// depth.
///
/// This requires enabling the hidden `testing` RPC module on the target node,
/// for example with `reth node --http --http.api eth,testing`.
#[arg(
long,
value_name = "DEPTH",
num_args = 0..=1,
default_missing_value = "8",
value_parser = parse_reorg_depth,
verbatim_doc_comment
)]
reorg: Option<usize>,
/// How long to wait after a forkchoice update before sending the next payload.
///
/// Accepts a duration string (e.g. `100ms`, `2s`) or a bare integer treated as
@@ -83,18 +109,79 @@ pub struct Command {
benchmark: BenchmarkArgs,
}
#[derive(Debug)]
struct PreparedBuiltBlock {
block_hash: B256,
params: serde_json::Value,
}
#[derive(Debug)]
struct QueuedForkBlock {
block_number: u64,
prepared: PreparedBuiltBlock,
}
#[derive(Debug)]
struct ReorgState {
depth: usize,
fork_length: usize,
branch_point_hash: Option<B256>,
fork_parent_hash: Option<B256>,
}
impl ReorgState {
const fn new(depth: usize) -> Self {
Self { depth, fork_length: 0, branch_point_hash: None, fork_parent_hash: None }
}
const fn push_fork_head(&mut self, canonical_parent_hash: B256, fork_head_hash: B256) {
if self.fork_length == 0 {
self.branch_point_hash = Some(canonical_parent_hash);
}
self.fork_length += 1;
self.fork_parent_hash = Some(fork_head_hash);
}
fn forkchoice_state(&self, fork_head_hash: B256) -> eyre::Result<ForkchoiceState> {
let branch_point_hash = self.branch_point_hash.ok_or_eyre("missing reorg branch point")?;
Ok(ForkchoiceState {
head_block_hash: fork_head_hash,
safe_block_hash: branch_point_hash,
finalized_block_hash: branch_point_hash,
})
}
const fn reset(&mut self) {
self.fork_length = 0;
self.branch_point_hash = None;
self.fork_parent_hash = None;
}
}
impl Command {
/// Execute `benchmark new-payload-fcu` command
pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
if self.reorg.is_some() && self.benchmark.rlp_blocks {
bail!("--reorg cannot be combined with --rlp-blocks")
}
if self.reorg.is_some() && self.enable_bal {
bail!("--reorg cannot be combined with --enable-bal")
}
// Log mode configuration
if let Some(duration) = self.wait_time {
info!(target: "reth-bench", "Using wait-time mode with {}ms minimum interval between blocks", duration.as_millis());
}
if let Some(depth) = self.reorg {
info!(target: "reth-bench", depth, "Using testing_buildBlockV1 reorg mode");
}
let BenchContext {
benchmark_mode,
block_provider,
auth_provider,
local_rpc_provider,
next_block,
use_reth_namespace,
rlp_blocks,
@@ -182,7 +269,8 @@ impl Command {
let mut blocks_processed = 0u64;
let total_benchmark_duration = Instant::now();
let mut total_wait_time = Duration::ZERO;
let mut reorg_state = self.reorg.map(ReorgState::new);
let mut queued_fork_block = None;
while let Some((block, head, safe, finalized, rlp)) = {
let wait_start = Instant::now();
let result = blocks.try_next().await?;
@@ -192,11 +280,13 @@ impl Command {
let gas_used = block.header.gas_used;
let gas_limit = block.header.gas_limit;
let block_number = block.header.number;
let canonical_parent_hash = block.header.parent_hash;
let transaction_count = block.transactions.len() as u64;
debug!(target: "reth-bench", ?block_number, "Sending payload");
let forkchoice_state = ForkchoiceState {
let deferred_branch_start_block = reorg_state
.as_ref()
.filter(|state| state.fork_length == 0 && queued_fork_block.is_none())
.map(|_| block.clone());
let canonical_forkchoice_state = ForkchoiceState {
head_block_hash: head,
safe_block_hash: safe,
finalized_block_hash: finalized,
@@ -218,10 +308,11 @@ impl Command {
no_wait_for_caches,
bal,
)?;
debug!(target: "reth-bench", ?block_number, "Sending payload");
let start = Instant::now();
let server_timings =
call_new_payload_with_reth(&auth_provider, version, params).await?;
let np_latency =
server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
let new_payload_result = NewPayloadResult {
@@ -242,17 +333,12 @@ impl Command {
};
let fcu_start = Instant::now();
call_forkchoice_updated_with_reth(&auth_provider, version, forkchoice_state).await?;
call_forkchoice_updated_with_reth(&auth_provider, version, canonical_forkchoice_state)
.await?;
let fcu_latency = fcu_start.elapsed();
let total_latency = if server_timings.is_some() {
// When using server-side latency for newPayload, derive total from the
// independently measured components to avoid mixing server-side and
// client-side (network-inclusive) timings.
np_latency + fcu_latency
} else {
start.elapsed()
};
let total_latency =
if server_timings.is_some() { np_latency + fcu_latency } else { start.elapsed() };
let combined_result = CombinedResult {
block_number,
gas_limit,
@@ -262,6 +348,88 @@ impl Command {
total_latency,
};
if let Some(reorg_state) = reorg_state.as_mut() {
if queued_fork_block.is_none() && reorg_state.fork_length == 0 {
// A branch start uses a canonical parent, so it can be built lazily here
// instead of being queued ahead of time.
let block = deferred_branch_start_block
.as_ref()
.ok_or_eyre("missing deferred fork block for reorg branch start")?;
queued_fork_block = Some(QueuedForkBlock {
block_number,
prepared: prepare_built_block(
&local_rpc_provider,
block,
canonical_parent_hash,
no_wait_for_caches,
)
.await?,
});
}
let queued = queued_fork_block
.take()
.ok_or_eyre("missing queued fork block for reorg replay")?;
ensure!(
queued.block_number == block_number,
"queued fork block {} does not match source block {}",
queued.block_number,
block_number
);
let prepared = queued.prepared;
call_new_payload_with_reth(&auth_provider, None, prepared.params).await?;
reorg_state.push_fork_head(canonical_parent_hash, prepared.block_hash);
let forkchoice_state = reorg_state.forkchoice_state(prepared.block_hash)?;
info!(
target: "reth-bench",
block_number,
branch_point = %forkchoice_state.safe_block_hash,
fork_head = %prepared.block_hash,
fork_depth = reorg_state.fork_length,
max_reorg_depth = reorg_state.depth,
"Switching forkchoice to reorg branch"
);
let fcu_start = Instant::now();
call_forkchoice_updated_with_reth(&auth_provider, None, forkchoice_state).await?;
let _fork_fcu_latency = fcu_start.elapsed();
let next_fork_block_number = block_number + 1;
if reorg_state.fork_length < reorg_state.depth {
queued_fork_block = queue_fork_block(
&block_provider,
&local_rpc_provider,
&benchmark_mode,
next_fork_block_number,
Some(prepared.block_hash),
no_wait_for_caches,
)
.await?;
} else {
info!(
target: "reth-bench",
block_number,
reorg_depth = reorg_state.depth,
"Resetting reorg branch after reaching max depth"
);
// `testing_buildBlockV1` resolves the parent from canonical state, so switch
// back to the source chain before reseeding the next queued fork block.
call_forkchoice_updated_with_reth(
&auth_provider,
version,
canonical_forkchoice_state,
)
.await?;
reorg_state.reset();
queued_fork_block = None;
}
}
// Exclude time spent waiting on the block prefetch channel from the benchmark duration.
// We want to measure engine throughput, not RPC fetch latency.
blocks_processed += 1;
@@ -318,3 +486,155 @@ impl Command {
Ok(())
}
}
async fn prepare_built_block(
block_provider: &RootProvider<AnyNetwork>,
block: &AnyRpcBlock,
parent_block_hash: B256,
no_wait_for_caches: bool,
) -> eyre::Result<PreparedBuiltBlock> {
const MAX_BUILD_ATTEMPTS: usize = 10;
const BUILD_RETRY_INTERVAL: Duration = Duration::from_millis(100);
let request = build_block_request(block, parent_block_hash)?;
let built_payload: ExecutionPayloadEnvelopeV5 = {
let mut attempts_remaining = MAX_BUILD_ATTEMPTS;
loop {
match block_provider.client().request("testing_buildBlockV1", [request.clone()]).await {
Ok(payload) => break payload,
Err(err) if attempts_remaining > 1 && is_retryable_build_block_error(&err) => {
warn!(
target: "reth-bench",
block_number = block.header.number,
%parent_block_hash,
attempts_remaining,
error = %err,
"Retrying testing_buildBlockV1 after transient fork build failure"
);
attempts_remaining -= 1;
tokio::time::sleep(BUILD_RETRY_INTERVAL).await;
}
Err(err) => {
return Err(err).wrap_err_with(|| {
format!(
"Failed to build block {} via testing_buildBlockV1",
block.header.number
)
})
}
}
}
};
let payload = &built_payload.execution_payload.payload_inner.payload_inner;
let block_hash = payload.block_hash;
let (payload, sidecar) = built_payload
.into_payload_and_sidecar(block.header.parent_beacon_block_root.unwrap_or_default());
// Fork payloads are built immediately before the next `testing_buildBlockV1` call. Leaving
// reth's default persistence wait enabled here gives the regular RPC side a consistent base
// state for the next synthetic fork block build.
let params = serde_json::to_value((
RethNewPayloadInput::ExecutionData(ExecutionData { payload, sidecar }),
None::<bool>,
no_wait_for_caches.then_some(false),
))?;
Ok(PreparedBuiltBlock { block_hash, params })
}
#[allow(clippy::too_many_arguments)]
async fn queue_fork_block(
block_provider: &RootProvider<AnyNetwork>,
local_rpc_provider: &RootProvider<AnyNetwork>,
benchmark_mode: &crate::bench_mode::BenchMode,
block_number: u64,
parent_block_hash: Option<B256>,
no_wait_for_caches: bool,
) -> eyre::Result<Option<QueuedForkBlock>> {
if !benchmark_mode.contains(block_number) {
return Ok(None)
}
let future_block = block_provider
.get_block_by_number(alloy_eips::BlockNumberOrTag::Number(block_number))
.full()
.await
.wrap_err_with(|| format!("Failed to fetch block by number {block_number}"))?
.ok_or_eyre("Block not found")?;
let parent_block_hash = parent_block_hash.unwrap_or(future_block.header.parent_hash);
Ok(Some(QueuedForkBlock {
block_number,
prepared: prepare_built_block(
local_rpc_provider,
&future_block,
parent_block_hash,
no_wait_for_caches,
)
.await?,
}))
}
fn is_retryable_build_block_error(err: &alloy_transport::TransportError) -> bool {
let message = err.to_string();
message.contains("block not found: hash") ||
message.contains("block hash not found for block number")
}
fn build_block_request(
block: &AnyRpcBlock,
parent_block_hash: B256,
) -> eyre::Result<TestingBuildBlockRequestV1> {
let mut transactions = block
.clone()
.try_into_transactions()
.map_err(|_| eyre::eyre!("Block transactions must be fetched in full for --reorg"))?
.into_iter()
.map(|tx| {
let tx: TxEnvelope =
tx.try_into().map_err(|_| eyre::eyre!("unsupported tx type in RPC block"))?;
if tx.is_eip4844() {
return Ok(None)
}
Ok(Some(tx.encoded_2718().into()))
})
.filter_map(|tx| tx.transpose())
.collect::<eyre::Result<Vec<_>>>()?;
// `testing_buildBlockV1` only takes raw transaction bytes, so we exclude blob transactions
// from the synthetic fork blocks rather than trying to reconstruct their sidecars.
// Keep only 90% of the remaining transactions so the alternate branch produces a materially
// different post-state instead of only differing by header data.
let keep = transactions.len().saturating_mul(9) / 10;
transactions.truncate(keep);
let rpc_block = block.clone().into_inner();
Ok(TestingBuildBlockRequestV1 {
parent_block_hash,
payload_attributes: PayloadAttributes {
timestamp: block.header.timestamp,
prev_randao: block.header.mix_hash.unwrap_or_default(),
suggested_fee_recipient: block.header.beneficiary,
withdrawals: rpc_block.withdrawals.map(|withdrawals| withdrawals.into_inner()),
parent_beacon_block_root: block.header.parent_beacon_block_root,
slot_number: block.header.slot_number,
},
transactions,
extra_data: Some(block.header.extra_data.clone()),
})
}
fn parse_reorg_depth(value: &str) -> Result<usize, String> {
let depth = value
.trim()
.parse::<usize>()
.map_err(|_| format!("invalid reorg depth {value:?}, expected a positive integer"))?;
if depth == 0 {
return Err("reorg depth must be greater than 0".to_string())
}
Ok(depth)
}

View File

@@ -54,6 +54,7 @@ impl Command {
rlp_blocks,
wait_for_persistence,
no_wait_for_caches,
..
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
let total_blocks = benchmark_mode.total_blocks();

View File

@@ -150,16 +150,22 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
// commands can proceed.
debug!(target: "reth::cli", ?rocksdb_path, "RocksDB not found, initializing empty database");
reth_fs_util::create_dir_all(&rocksdb_path)?;
RocksDBProvider::builder(data_dir.rocksdb())
let mut builder = RocksDBProvider::builder(data_dir.rocksdb())
.with_default_tables()
.with_database_log_level(self.db.log_level)
.build()?
.with_database_log_level(self.db.log_level);
if let Some(cache_size) = self.db.rocksdb_block_cache_size {
builder = builder.with_block_cache_size(cache_size);
}
builder.build()?
} else {
RocksDBProvider::builder(data_dir.rocksdb())
let mut builder = RocksDBProvider::builder(data_dir.rocksdb())
.with_default_tables()
.with_database_log_level(self.db.log_level)
.with_read_only(!access.is_read_write())
.build()?
.with_read_only(!access.is_read_write());
if let Some(cache_size) = self.db.rocksdb_block_cache_size {
builder = builder.with_block_cache_size(cache_size);
}
builder.build()?
};
let provider_factory =

View File

@@ -14,7 +14,7 @@ use reth_db_api::{
table::{Compress, Decompress, DupSort, Table},
tables,
transaction::DbTx,
RawKey, RawTable, Receipts, TableViewer, Transactions,
RawKey, RawTable, TableViewer,
};
use reth_db_common::DbTool;
use reth_node_api::{HeaderTy, ReceiptTy, TxTy};
@@ -264,15 +264,12 @@ impl Command {
);
}
StaticFileSegment::Transactions => {
let transaction = <<Transactions as Table>::Value>::decompress(
content[0].as_slice(),
)?;
let transaction = TxTy::<N>::decompress(content[0].as_slice())?;
println!("{}", serde_json::to_string_pretty(&transaction)?);
}
StaticFileSegment::Receipts => {
let receipt = <<Receipts as Table>::Value>::decompress(
content[0].as_slice(),
)?;
let receipt =
ReceiptTy::<N>::decompress(content[0].as_slice())?;
println!("{}", serde_json::to_string_pretty(&receipt)?);
}
StaticFileSegment::TransactionSenders => {

View File

@@ -50,8 +50,13 @@ where
info!(target: "reth::cli", new_tip = ?header.num_hash(), "Setting up dummy EVM chain before importing state.");
let static_file_provider = provider_rw.static_file_provider();
// Write EVM dummy data up to `header - 1` block
append_dummy_chain(&static_file_provider, header.number() - 1, header_factory)?;
// Write EVM dummy data up to `header - 1` block. Skip when the supplied
// header is at block 0: `header.number() - 1` would underflow in u64 to
// `u64::MAX`, sending `append_dummy_chain` into a 1..=u64::MAX loop that
// exhausts memory before failing.
if header.number() > 0 {
append_dummy_chain(&static_file_provider, header.number() - 1, header_factory)?;
}
info!(target: "reth::cli", "Appending first valid block.");
@@ -191,7 +196,13 @@ mod tests {
use alloy_primitives::{address, b256};
use reth_db_common::init::init_genesis;
use reth_provider::{test_utils::create_test_provider_factory, DatabaseProviderFactory};
use std::io::Write;
use std::{
io::Write,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use tempfile::NamedTempFile;
#[test]
@@ -264,4 +275,45 @@ mod tests {
assert_eq!(actual_next_height, expected_next_height);
}
/// Regression: a header at block 0 used to send `append_dummy_chain` into
/// a `1..=u64::MAX` loop because `header.number() - 1` underflowed in
/// u64. The guard `if header.number() > 0` skips the dummy-chain step
/// when there is no pre-genesis range to backfill, so `header_factory`
/// is never invoked.
#[test]
fn test_setup_without_evm_skips_dummy_chain_for_genesis_header() {
let header = Header { number: 0, ..Default::default() };
let header_hash = header.hash_slow();
let provider_factory = create_test_provider_factory();
init_genesis(&provider_factory).unwrap();
let provider_rw = provider_factory.database_provider_rw().unwrap();
let factory_calls = Arc::new(AtomicU64::new(0));
let factory_calls_inner = Arc::clone(&factory_calls);
// The Result of `setup_without_evm` itself is not asserted: with
// `number == 0` plus a genesis already written by `init_genesis`,
// the subsequent `append_first_block` may legitimately fail. The
// bug under test is the OOM in the dummy-chain loop, observable
// through the factory-call counter below.
let _ = setup_without_evm(
&provider_rw,
SealedHeader::new(header, header_hash),
move |number| {
// Bound calls so a regression cannot exhaust the test
// runner's memory; the only correct value here is 0.
let n = factory_calls_inner.fetch_add(1, Ordering::Relaxed);
assert!(n < 8, "header_factory must not be invoked for a genesis-block header");
Header { number, ..Default::default() }
},
);
assert_eq!(
factory_calls.load(Ordering::Relaxed),
0,
"append_dummy_chain must be skipped when header.number() == 0"
);
}
}

View File

@@ -188,7 +188,7 @@ impl<C: ChainSpecParser> DownloadArgs<C> {
)
}
config.peers.trusted_nodes_only = self.network.trusted_only;
config.peers.trusted_nodes_only |= self.network.trusted_only;
let default_secret_key_path = data_dir.p2p_secret();
let p2p_secret_key = self.network.secret_key(default_secret_key_path)?;

View File

@@ -5,6 +5,7 @@ use crate::common::{
EnvironmentArgs,
};
use alloy_consensus::{transaction::TxHashRef, BlockHeader, TxReceipt};
use alloy_primitives::{Address, B256, U256};
use clap::Parser;
use eyre::WrapErr;
use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks};
@@ -12,15 +13,19 @@ use reth_cli::chainspec::ChainSpecParser;
use reth_cli_util::cancellation::CancellationToken;
use reth_consensus::FullConsensus;
use reth_evm::{execute::Executor, ConfigureEvm};
use reth_primitives_traits::{format_gas_throughput, BlockBody, GotExpected};
use reth_primitives_traits::{format_gas_throughput, Account, BlockBody, GotExpected};
use reth_provider::{
BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory, ReceiptProvider,
StaticFileProviderFactory, TransactionVariant,
};
use reth_revm::database::StateProviderDatabase;
use reth_revm::{
database::StateProviderDatabase,
db::{states::reverts::AccountInfoRevert, BundleState},
};
use reth_stages::stages::calculate_gas_used_from_headers;
use reth_storage_api::{DBProvider, TryIntoHistoricalStateProvider};
use reth_storage_api::{ChangeSetReader, DBProvider, StorageChangeSetReader};
use std::{
collections::HashMap,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
@@ -69,13 +74,18 @@ impl<C: ChainSpecParser> Command<C> {
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>> Command<C> {
/// Execute `re-execute` command
pub async fn execute<N>(
self,
mut self,
components: impl CliComponentsBuilder<N>,
runtime: reth_tasks::Runtime,
) -> eyre::Result<()>
where
N: CliNodeTypes<ChainSpec = C::ChainSpec>,
{
// Default to 4GB RocksDB block cache for re-execute unless explicitly set.
if self.env.db.rocksdb_block_cache_size.is_none() {
self.env.db.rocksdb_block_cache_size = Some(4 << 30);
}
let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RO, runtime)?;
let components = components(provider_factory.chain_spec());
@@ -109,20 +119,6 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
min_block..=max_block,
)?;
let db_at = {
let provider_factory = provider_factory.clone();
move |block_number: u64| {
StateProviderDatabase(
provider_factory
.provider()
.unwrap()
.disable_long_read_transaction_safety()
.try_into_history_at_block(block_number)
.unwrap(),
)
}
};
let skip_invalid_blocks = self.skip_invalid_blocks;
let blocks_per_chunk = self.blocks_per_chunk;
let (stats_tx, mut stats_rx) = mpsc::unbounded_channel();
@@ -138,13 +134,23 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
let provider_factory = provider_factory.clone();
let evm_config = components.evm_config().clone();
let consensus = components.consensus().clone();
let db_at = db_at.clone();
let stats_tx = stats_tx.clone();
let info_tx = info_tx.clone();
let cancellation = cancellation.clone();
let next_block = Arc::clone(&next_block);
tasks.spawn_blocking(move || {
let executor_lifetime = Duration::from_secs(600);
let provider = provider_factory.database_provider_ro()?.disable_long_read_transaction_safety();
let db_at = {
|block_number: u64| {
StateProviderDatabase(
provider
.history_by_block_number(block_number)
.unwrap(),
)
}
};
loop {
if cancellation.is_cancelled() {
@@ -254,11 +260,28 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
if executor.size_hint() > 5_000_000 ||
executor_created.elapsed() > executor_lifetime
{
executor =
evm_config.batch_executor(db_at(block.number()));
let last_block = block.number();
let old_executor = std::mem::replace(
&mut executor,
evm_config.batch_executor(db_at(last_block)),
);
let bundle = old_executor.into_state().take_bundle();
verify_bundle_against_changesets(
&provider,
&bundle,
last_block,
)?;
executor_created = Instant::now();
}
}
// Full verification at chunk end for remaining unverified blocks
let bundle = executor.into_state().take_bundle();
verify_bundle_against_changesets(
&provider,
&bundle,
chunk_end - 1,
)?;
}
eyre::Ok(())
@@ -339,3 +362,98 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
Ok(())
}
}
/// Verifies reverts against database changesets.
///
/// For each block, reverts must match changeset entries exactly. No extra slots/accounts
/// in reverts for non-destroyed accounts. Destroyed accounts may have extra changeset slots
/// (from DB storage wipe) absent from reverts.
fn verify_bundle_against_changesets<P>(
provider: &P,
bundle: &BundleState,
last_block: u64,
) -> eyre::Result<()>
where
P: ChangeSetReader + StorageChangeSetReader,
{
// Verify reverts against changesets per block
for (i, block_reverts) in bundle.reverts.iter().rev().enumerate() {
let block_number = last_block - i as u64;
let mut cs_accounts: HashMap<Address, Option<Account>> = provider
.account_block_changeset(block_number)?
.into_iter()
.map(|cs| (cs.address, cs.info))
.collect();
let mut cs_storage: HashMap<Address, HashMap<B256, U256>> = HashMap::new();
for (bna, entry) in provider.storage_changeset(block_number)? {
cs_storage.entry(bna.address()).or_default().insert(entry.key, entry.value);
}
for (addr, revert) in block_reverts {
// Verify account info
match &revert.account {
AccountInfoRevert::DoNothing => {
eyre::ensure!(
!cs_accounts.contains_key(addr),
"Block {block_number}: account {addr} in changeset but revert is DoNothing",
);
}
AccountInfoRevert::DeleteIt => {
let cs_info = cs_accounts.remove(addr).ok_or_else(|| {
eyre::eyre!("Block {block_number}: account {addr} revert is DeleteIt but not in changeset")
})?;
eyre::ensure!(
cs_info.is_none(),
"Block {block_number}: account {addr} revert is DeleteIt but changeset has {cs_info:?}",
);
}
AccountInfoRevert::RevertTo(info) => {
let cs_info = cs_accounts.remove(addr).ok_or_else(|| {
eyre::eyre!("Block {block_number}: account {addr} revert is RevertTo but not in changeset")
})?;
let revert_acct = Some(Account::from(info));
eyre::ensure!(
revert_acct == cs_info,
"Block {block_number}: account {addr} info mismatch: revert={revert_acct:?} cs={cs_info:?}",
);
}
}
// Verify storage slots — remove matched changeset entries as we go
let mut cs_slots = cs_storage.get_mut(addr);
for (slot_key, revert_slot) in &revert.storage {
let b256_key = B256::from(*slot_key);
match cs_slots.as_mut().and_then(|s| s.remove(&b256_key)) {
Some(cs_value) => eyre::ensure!(
revert_slot.to_previous_value() == cs_value,
"Block {block_number}: {addr} slot {b256_key} mismatch: \
revert={} cs={cs_value}",
revert_slot.to_previous_value(),
),
None => eyre::ensure!(
revert.wipe_storage,
"Block {block_number}: {addr} slot {b256_key} in reverts but not in changeset",
),
}
}
// Any remaining cs_storage slots for this address must be from a destroyed account
if let Some(remaining) = cs_slots.filter(|s| !s.is_empty()) {
eyre::ensure!(
revert.wipe_storage,
"Block {block_number}: {addr} has {} unmatched storage slots in changeset",
remaining.len(),
);
}
}
// Any remaining cs_accounts entries had no corresponding revert
if let Some(addr) = cs_accounts.keys().next() {
eyre::bail!("Block {block_number}: account {addr} in changeset but not in reverts");
}
}
Ok(())
}

View File

@@ -6,7 +6,7 @@ use reth_db_api::{
};
use reth_db_common::DbTool;
use reth_evm::ConfigureEvm;
use reth_node_api::HeaderTy;
use reth_node_api::{HeaderTy, TxTy};
use reth_node_core::dirs::{ChainPath, DataDirPath};
use reth_provider::{
providers::{ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
@@ -88,7 +88,7 @@ fn import_tables_with_range<N: ProviderNodeTypes>(
)
})??;
output_db.update(|tx| {
tx.import_table_with_range::<tables::BlockOmmers, _>(
tx.import_table_with_range::<tables::BlockOmmers<HeaderTy<N>>, _>(
&db_tool.provider_factory.db_ref().tx()?,
Some(from),
to,
@@ -110,7 +110,7 @@ fn import_tables_with_range<N: ProviderNodeTypes>(
})??;
output_db.update(|tx| {
tx.import_table_with_range::<tables::Transactions, _>(
tx.import_table_with_range::<tables::Transactions<TxTy<N>>, _>(
&db_tool.provider_factory.db_ref().tx()?,
Some(from_tx),
to_tx,

View File

@@ -210,7 +210,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
let consensus = Arc::new(components.consensus().clone());
let mut config = config;
config.peers.trusted_nodes_only = self.network.trusted_only;
config.peers.trusted_nodes_only |= self.network.trusted_only;
config.peers.trusted_nodes.extend(self.network.trusted_peers.clone());
let network_secret_path = self

View File

@@ -15,6 +15,9 @@ pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 0;
/// The size of proof targets chunk to spawn in one multiproof calculation.
pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE: usize = 5;
/// Default number of cache hits before an invalid header entry is evicted and reprocessed.
pub const DEFAULT_INVALID_HEADER_HIT_EVICTION_THRESHOLD: u8 = 128;
/// Gas threshold below which the small block chunk size is used.
pub const SMALL_BLOCK_GAS_THRESHOLD: u64 = 20_000_000;
@@ -102,6 +105,11 @@ pub struct TreeConfig {
block_buffer_limit: u32,
/// Number of invalid headers to keep in cache.
max_invalid_header_cache_length: u32,
/// Number of cache hits before an invalid header entry is evicted and reprocessed.
///
/// Setting this to `0` effectively disables the cache because entries are evicted on the
/// first lookup.
invalid_header_hit_eviction_threshold: u8,
/// Maximum number of blocks to execute sequentially in a batch.
///
/// This is used as a cutoff to prevent long-running sequential block execution when we receive
@@ -206,6 +214,7 @@ impl Default for TreeConfig {
persistence_backpressure_threshold: DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD,
block_buffer_limit: DEFAULT_BLOCK_BUFFER_LIMIT,
max_invalid_header_cache_length: DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH,
invalid_header_hit_eviction_threshold: DEFAULT_INVALID_HEADER_HIT_EVICTION_THRESHOLD,
max_execute_block_batch_size: DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE,
legacy_state_root: false,
always_compare_trie_updates: false,
@@ -248,6 +257,7 @@ impl TreeConfig {
persistence_backpressure_threshold: u64,
block_buffer_limit: u32,
max_invalid_header_cache_length: u32,
invalid_header_hit_eviction_threshold: u8,
max_execute_block_batch_size: usize,
legacy_state_root: bool,
always_compare_trie_updates: bool,
@@ -281,6 +291,7 @@ impl TreeConfig {
persistence_backpressure_threshold,
block_buffer_limit,
max_invalid_header_cache_length,
invalid_header_hit_eviction_threshold,
max_execute_block_batch_size,
legacy_state_root,
always_compare_trie_updates,
@@ -338,6 +349,14 @@ impl TreeConfig {
self.max_invalid_header_cache_length
}
/// Return the invalid header cache hit eviction threshold.
///
/// Setting this to `0` effectively disables the cache because entries are evicted on the
/// first lookup.
pub const fn invalid_header_hit_eviction_threshold(&self) -> u8 {
self.invalid_header_hit_eviction_threshold
}
/// Return the maximum execute block batch size.
pub const fn max_execute_block_batch_size(&self) -> usize {
self.max_execute_block_batch_size
@@ -468,6 +487,15 @@ impl TreeConfig {
self
}
/// Setter for the invalid header cache hit eviction threshold.
pub const fn with_invalid_header_hit_eviction_threshold(
mut self,
invalid_header_hit_eviction_threshold: u8,
) -> Self {
self.invalid_header_hit_eviction_threshold = invalid_header_hit_eviction_threshold;
self
}
/// Setter for maximum execute block batch size.
pub const fn with_max_execute_block_batch_size(
mut self,

View File

@@ -8,25 +8,28 @@ use schnellru::{ByLength, LruMap};
use std::fmt::Debug;
use tracing::warn;
/// The max hit counter for invalid headers in the cache before it is forcefully evicted.
///
/// In other words, if a header is referenced more than this number of times, it will be evicted to
/// allow for reprocessing.
const INVALID_HEADER_HIT_EVICTION_THRESHOLD: u8 = 128;
/// Keeps track of invalid headers.
#[derive(Debug)]
pub struct InvalidHeaderCache {
/// This maps a header hash to a reference to its invalid ancestor.
headers: LruMap<B256, HeaderEntry>,
/// Number of cache hits before an invalid header entry is evicted and reprocessed.
hit_eviction_threshold: u8,
/// Metrics for the cache.
metrics: InvalidHeaderCacheMetrics,
}
impl InvalidHeaderCache {
/// Invalid header cache constructor.
pub fn new(max_length: u32) -> Self {
Self { headers: LruMap::new(ByLength::new(max_length)), metrics: Default::default() }
///
/// Setting `hit_eviction_threshold` to `0` effectively disables the cache because entries are
/// evicted on the first lookup.
pub fn new(max_length: u32, hit_eviction_threshold: u8) -> Self {
Self {
headers: LruMap::new(ByLength::new(max_length)),
hit_eviction_threshold,
metrics: Default::default(),
}
}
fn insert_entry(&mut self, hash: B256, header: BlockWithParent) {
@@ -41,7 +44,7 @@ impl InvalidHeaderCache {
{
let entry = self.headers.get(hash)?;
entry.hit_count += 1;
if entry.hit_count < INVALID_HEADER_HIT_EVICTION_THRESHOLD {
if entry.hit_count < self.hit_eviction_threshold {
return Some(entry.header)
}
}
@@ -110,17 +113,28 @@ mod tests {
#[test]
fn test_hit_eviction() {
let mut cache = InvalidHeaderCache::new(10);
let hit_eviction_threshold = 3;
let mut cache = InvalidHeaderCache::new(10, hit_eviction_threshold);
let header = Header::default();
let header = SealedHeader::seal_slow(header);
cache.insert(header.block_with_parent());
assert_eq!(cache.headers.get(&header.hash()).unwrap().hit_count, 0);
for hit in 1..INVALID_HEADER_HIT_EVICTION_THRESHOLD {
for hit in 1..hit_eviction_threshold {
assert!(cache.get(&header.hash()).is_some());
assert_eq!(cache.headers.get(&header.hash()).unwrap().hit_count, hit);
}
assert!(cache.get(&header.hash()).is_none());
}
#[test]
fn test_zero_hit_eviction_threshold_effectively_disables_cache() {
let mut cache = InvalidHeaderCache::new(10, 0);
let header = SealedHeader::seal_slow(Header::default());
cache.insert(header.block_with_parent());
assert!(cache.get(&header.hash()).is_none());
assert_eq!(cache.headers.len(), 0);
}
}

View File

@@ -151,11 +151,15 @@ impl<N: NodePrimitives> EngineApiTreeState<N> {
fn new(
block_buffer_limit: u32,
max_invalid_header_cache_length: u32,
invalid_header_hit_eviction_threshold: u8,
canonical_block: BlockNumHash,
engine_kind: EngineApiKind,
) -> Self {
Self {
invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
invalid_headers: InvalidHeaderCache::new(
max_invalid_header_cache_length,
invalid_header_hit_eviction_threshold,
),
buffer: BlockBuffer::new(block_buffer_limit),
tree_state: TreeState::new(canonical_block, engine_kind),
forkchoice_state_tracker: ForkchoiceStateTracker::default(),
@@ -436,6 +440,7 @@ where
let state = EngineApiTreeState::new(
config.block_buffer_limit(),
config.max_invalid_header_cache_length(),
config.invalid_header_hit_eviction_threshold(),
header.num_hash(),
kind,
);
@@ -3024,7 +3029,15 @@ where
InsertBlockValidationError::Consensus(err) => self.consensus.is_transient_error(err),
_ => false,
};
if !is_transient {
if is_transient {
warn!(
target: "engine::tree",
invalid_hash=%block.hash(),
invalid_number=block.number(),
%validation_err,
"Skipping invalid header cache insert for transient validation error",
);
} else {
self.state.invalid_headers.insert(block.block_with_parent());
}
self.emit_event(EngineApiEvent::BeaconConsensus(ConsensusEngineEvent::InvalidBlock(

View File

@@ -7,7 +7,7 @@ use crate::tree::{
CacheWaitDurations, CachedStateMetrics, CachedStateMetricsSource, ExecutionCache,
PayloadExecutionCache, SavedCache, StateProviderBuilder, TreeConfig, WaitForCaches,
};
use alloy_eip7928::BlockAccessList;
use alloy_eip7928::bal::DecodedBal;
use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
use alloy_primitives::B256;
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
@@ -250,7 +250,6 @@ where
provider_builder: StateProviderBuilder<N, P>,
multiproof_provider_factory: F,
config: &TreeConfig,
bal: Option<Arc<BlockAccessList>>,
) -> IteratorPayloadHandle<Evm, I, N>
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
@@ -273,13 +272,12 @@ where
halve_workers,
config,
);
let install_state_hook = bal.is_none();
let install_state_hook = env.decoded_bal.is_none();
let prewarm_handle = self.spawn_caching_with(
env,
prewarm_rx,
provider_builder,
Some(state_root_handle.updates_tx().clone()),
bal,
);
PayloadHandle {
@@ -300,14 +298,13 @@ where
env: ExecutionEnv<Evm>,
transactions: I,
provider_builder: StateProviderBuilder<N, P>,
bal: Option<Arc<BlockAccessList>>,
) -> IteratorPayloadHandle<Evm, I, N>
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
{
let (prewarm_rx, execution_rx) =
self.spawn_tx_iterator(transactions, env.transaction_count);
let prewarm_handle = self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal);
let prewarm_handle = self.spawn_caching_with(env, prewarm_rx, provider_builder, None);
PayloadHandle {
state_root_handle: None,
install_state_hook: false,
@@ -465,7 +462,7 @@ where
level = "debug",
target = "engine::tree::payload_processor",
skip_all,
fields(bal=%bal.is_some())
fields(bal=%env.decoded_bal.is_some())
)]
fn spawn_caching_with<P>(
&self,
@@ -473,7 +470,6 @@ where
transactions: mpsc::Receiver<(usize, impl ExecutableTxFor<Evm> + Clone + Send + 'static)>,
provider_builder: StateProviderBuilder<N, P>,
to_sparse_trie_task: Option<CrossbeamSender<StateRootMessage>>,
bal: Option<Arc<BlockAccessList>>,
) -> CacheTaskHandle<N::Receipt>
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
@@ -484,7 +480,7 @@ where
let saved_cache = self.disable_state_cache.not().then(|| self.cache_for(env.parent_hash));
let executed_tx_index = Arc::new(AtomicUsize::new(0));
let maybe_decoded_bal = env.decoded_bal.clone();
// configure prewarming
let prewarm_ctx = PrewarmContext {
env,
@@ -507,15 +503,16 @@ where
prewarm_ctx,
to_sparse_trie_task,
);
{
let to_prewarm_task = to_prewarm_task.clone();
let disable_bal_parallel_execution = self.disable_bal_parallel_execution;
self.executor.spawn_blocking_named("prewarm", move || {
let mode = if skip_prewarm {
PrewarmMode::Skipped
} else if let Some(bal) = bal.filter(|_| !disable_bal_parallel_execution) {
PrewarmMode::BlockAccessList(bal)
} else if let Some(decoded_bal) =
maybe_decoded_bal.filter(|_| !disable_bal_parallel_execution)
{
PrewarmMode::BlockAccessList(decoded_bal)
} else {
PrewarmMode::Transactions(transactions)
};
@@ -936,6 +933,9 @@ pub struct ExecutionEnv<Evm: ConfigureEvm> {
/// Withdrawals included in the block.
/// Used to generate prefetch targets for withdrawal addresses.
pub withdrawals: Option<Vec<Withdrawal>>,
/// Optional decoded BAL for the block.
/// Used to validate and optimize execution.
pub decoded_bal: Option<Arc<DecodedBal>>,
}
impl<Evm: ConfigureEvm> ExecutionEnv<Evm>
@@ -953,6 +953,7 @@ where
transaction_count: 0,
gas_used: 0,
withdrawals: None,
decoded_bal: None,
}
}
}
@@ -974,7 +975,7 @@ mod tests {
use reth_evm_ethereum::EthEvmConfig;
use reth_primitives_traits::{Account, Recovered, StorageEntry};
use reth_provider::{
providers::{BlockchainProvider, OverlayStateProviderFactory},
providers::{BlockchainProvider, OverlayBuilder, OverlayStateProviderFactory},
test_utils::create_test_provider_factory_with_chain_spec,
ChainSpecProvider, HashingWriter,
};
@@ -1158,19 +1159,16 @@ mod tests {
}
}
let account = revm_state::Account {
info: AccountInfo {
balance: U256::from(rng.random::<u64>()),
nonce: rng.random::<u64>(),
code_hash: KECCAK_EMPTY,
code: Some(Default::default()),
account_id: None,
},
original_info: Box::new(AccountInfo::default()),
storage,
status: AccountStatus::Touched,
transaction_id: 0,
let mut account = revm_state::Account::default();
account.info = AccountInfo {
balance: U256::from(rng.random::<u64>()),
nonce: rng.random::<u64>(),
code_hash: KECCAK_EMPTY,
code: Some(Default::default()),
account_id: None,
};
account.storage = storage;
account.status = AccountStatus::Touched;
state_update.insert(address, account);
}
@@ -1249,9 +1247,11 @@ mod tests {
std::convert::identity,
),
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
OverlayStateProviderFactory::new(provider_factory, ChangesetCache::new()),
OverlayStateProviderFactory::new(
provider_factory,
OverlayBuilder::new(ChangesetCache::new()),
),
&TreeConfig::default(),
None, // No BAL for test
);
let mut state_hook = handle.state_hook().expect("state hook is None");

View File

@@ -18,7 +18,7 @@ use crate::tree::{
StateProviderBuilder,
};
use alloy_consensus::transaction::TxHashRef;
use alloy_eip7928::BlockAccessList;
use alloy_eip7928::bal::DecodedBal;
use alloy_eips::eip4895::Withdrawal;
use alloy_primitives::{keccak256, StorageKey, B256};
use crossbeam_channel::Sender as CrossbeamSender;
@@ -48,7 +48,7 @@ pub enum PrewarmMode<Tx> {
/// Prewarm by executing transactions from a stream, each paired with its block index.
Transactions(Receiver<(usize, Tx)>),
/// Prewarm by prefetching slots from a Block Access List.
BlockAccessList(Arc<BlockAccessList>),
BlockAccessList(Arc<DecodedBal>),
/// Transaction prewarming is skipped (e.g. small blocks where the overhead exceeds the
/// benefit). No workers are spawned.
Skipped,
@@ -331,9 +331,10 @@ where
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn run_bal_prewarm(
&self,
bal: Arc<BlockAccessList>,
decoded_bal: Arc<DecodedBal>,
actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
) {
let bal = decoded_bal.as_bal();
if bal.is_empty() {
if let Some(to_sparse_trie_task) = self.to_sparse_trie_task.as_ref() {
let _ = to_sparse_trie_task.send(StateRootMessage::FinishedStateUpdates);
@@ -355,8 +356,8 @@ where
let parent_span = Span::current();
let prefetch_parent_span = parent_span.clone();
let stream_parent_span = parent_span;
let prefetch_bal = Arc::clone(&bal);
let stream_bal = Arc::clone(&bal);
let prefetch_bal = Arc::clone(&decoded_bal);
let stream_bal = Arc::clone(&decoded_bal);
let (prefetch_tx, prefetch_rx) = oneshot::channel();
let (stream_tx, stream_rx) = oneshot::channel();
@@ -367,12 +368,12 @@ where
target: "engine::tree::payload_processor::prewarm",
parent: &prefetch_parent_span,
"bal_prefetch_storage",
bal_accounts = prefetch_bal.len(),
bal_accounts = prefetch_bal.as_bal().len(),
);
let provider_parent_span = branch_span.clone();
let _span = branch_span.entered();
prefetch_bal.par_iter().for_each_init(
prefetch_bal.as_bal().par_iter().for_each_init(
|| {
(
prefetch_ctx.clone(),
@@ -400,12 +401,12 @@ where
target: "engine::tree::payload_processor::prewarm",
parent: &stream_parent_span,
"bal_hashed_state_stream",
bal_accounts = stream_bal.len(),
bal_accounts = stream_bal.as_bal().len(),
);
let provider_parent_span = branch_span.clone();
let _span = branch_span.entered();
stream_bal.par_iter().for_each_init(
stream_bal.as_bal().par_iter().for_each_init(
|| (ctx.clone(), None::<Box<dyn AccountReader>>, provider_parent_span.clone()),
|(ctx, provider, parent_span), account_changes| {
ctx.send_bal_hashed_state(

View File

@@ -894,7 +894,8 @@ mod tests {
use super::*;
use alloy_primitives::{keccak256, Address, B256, U256};
use reth_provider::{
providers::OverlayStateProviderFactory, test_utils::create_test_provider_factory,
providers::{OverlayBuilder, OverlayStateProviderFactory},
test_utils::create_test_provider_factory,
};
use reth_trie_db::ChangesetCache;
use reth_trie_parallel::proof_task::ProofTaskCtx;
@@ -983,8 +984,10 @@ mod tests {
fn run_returns_parent_root_without_revealing_blind_trie_when_no_state_updates() {
let runtime = reth_tasks::Runtime::test();
let provider_factory = create_test_provider_factory();
let overlay_factory =
OverlayStateProviderFactory::new(provider_factory, ChangesetCache::new());
let overlay_factory = OverlayStateProviderFactory::new(
provider_factory,
OverlayBuilder::new(ChangesetCache::new()),
);
let proof_worker_handle =
ProofWorkerHandle::new(&runtime, ProofTaskCtx::new(overlay_factory), false);

View File

@@ -48,7 +48,10 @@ use crate::tree::{
PayloadHandle, StateProviderBuilder, StateProviderDatabase, TreeConfig, WaitForCaches,
};
use alloy_consensus::transaction::{Either, TxHashRef};
use alloy_eip7928::{bal::Bal, BlockAccessList};
use alloy_eip7928::{
bal::{Bal, DecodedBal},
BlockAccessList,
};
use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal, NumHash};
use alloy_evm::Evm;
use alloy_primitives::{map::B256Set, B256};
@@ -77,13 +80,14 @@ use reth_primitives_traits::{
RecoveredBlock, SealedBlock, SealedHeader, SignerRecoverable,
};
use reth_provider::{
providers::OverlayStateProviderFactory, BlockExecutionOutput, BlockNumReader, BlockReader,
ChangeSetReader, DatabaseProviderFactory, DatabaseProviderROFactory, HashedPostStateProvider,
ProviderError, PruneCheckpointReader, StageCheckpointReader, StateProvider,
StateProviderFactory, StateReader, StorageChangeSetReader, StorageSettingsCache,
providers::{OverlayBuilder, OverlayStateProviderFactory},
BlockExecutionOutput, BlockNumReader, BlockReader, ChangeSetReader, DatabaseProviderFactory,
DatabaseProviderROFactory, HashedPostStateProvider, ProviderError, PruneCheckpointReader,
StageCheckpointReader, StateProvider, StateProviderBox, StateProviderFactory, StateReader,
StorageChangeSetReader, StorageSettingsCache,
};
use reth_revm::db::{states::bundle_state::BundleRetention, BundleAccount, State};
use reth_trie::{trie_cursor::TrieCursorFactory, updates::TrieUpdates, HashedPostState, StateRoot};
use reth_trie::{trie_cursor::TrieCursorFactory, updates::TrieUpdates, HashedPostState};
use reth_trie_db::ChangesetCache;
use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
use revm_primitives::{Address, KECCAK_EMPTY};
@@ -487,6 +491,12 @@ where
.in_scope(|| self.evm_env_for(&input))
.map_err(NewPayloadError::other)?;
// Extract the decoded BAL, if valid and available.
let decoded_bal = ensure_ok!(input
.try_decoded_access_list()
.map_err(|err| { Box::<dyn std::error::Error + Send + Sync>::from(err) }))
.map(Arc::new);
let env = ExecutionEnv {
evm_env,
hash: input.hash(),
@@ -495,6 +505,7 @@ where
transaction_count: input.transaction_count(),
gas_used: input.gas_used(),
withdrawals: input.withdrawals().map(|w| w.to_vec()),
decoded_bal,
};
// Plan the strategy used for state root computation.
@@ -509,33 +520,26 @@ where
// Get an iterator over the transactions in the payload
let txs = self.tx_iterator_for(&input)?;
// Extract the BAL, if valid and available
let block_access_list = ensure_ok!(input
.block_access_list()
.transpose()
// Eventually gets converted to a `InsertBlockErrorKind::Other`
.map_err(Box::<dyn std::error::Error + Send + Sync>::from))
.map(Arc::new);
// Create lazy overlay from ancestors - this doesn't block, allowing execution to start
// before the trie data is ready. The overlay will be computed on first access.
let (lazy_overlay, anchor_hash) = Self::get_parent_lazy_overlay(parent_hash, ctx.state());
// Create overlay factory for payload processor (StateRootTask path needs it for
// multiproofs)
let provider_factory = self.provider.clone();
let overlay_builder = OverlayBuilder::new(self.changeset_cache.clone())
.with_block_hash(Some(anchor_hash))
.with_lazy_overlay(lazy_overlay);
let overlay_factory =
OverlayStateProviderFactory::new(self.provider.clone(), self.changeset_cache.clone())
.with_block_hash(Some(anchor_hash))
.with_lazy_overlay(lazy_overlay);
OverlayStateProviderFactory::new(provider_factory.clone(), overlay_builder.clone());
// Spawn the appropriate processor based on strategy
let mut handle = ensure_ok!(self.spawn_payload_processor(
env.clone(),
txs,
provider_builder,
provider_builder.clone(),
overlay_factory.clone(),
strategy,
block_access_list,
));
// Create optional cache stats for detailed block logging
@@ -664,7 +668,7 @@ where
let task_result = ensure_ok_post_block!(
self.await_state_root_with_timeout(
&mut handle,
overlay_factory.clone(),
provider_builder.clone(),
&hashed_state,
),
block
@@ -688,7 +692,9 @@ where
// Compare trie updates with serial computation if configured
if self.config.always_compare_trie_updates() {
let _has_diff = self.compare_trie_updates_with_serial(
overlay_factory.clone(),
provider_builder.clone(),
provider_factory,
overlay_builder,
&hashed_state,
trie_updates.as_ref().clone(),
);
@@ -727,7 +733,11 @@ where
}
StateRootStrategy::Parallel => {
debug!(target: "engine::tree::payload_validator", "Using parallel state root algorithm");
match self.compute_state_root_parallel(overlay_factory.clone(), &hashed_state) {
match self.compute_state_root_parallel(
provider_factory,
overlay_builder,
&hashed_state,
) {
Ok(result) => {
let elapsed = root_time.elapsed();
info!(
@@ -763,7 +773,9 @@ where
}
let (root, updates) = ensure_ok_post_block!(
Self::compute_state_root_serial(overlay_factory.clone(), &hashed_state),
provider_builder
.build()
.and_then(|provider| Self::compute_state_root_serial(provider, &hashed_state)),
block
);
@@ -1087,7 +1099,8 @@ where
#[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
fn compute_state_root_parallel(
&self,
overlay_factory: OverlayStateProviderFactory<P>,
provider_factory: P,
overlay_builder: OverlayBuilder,
hashed_state: &LazyHashedPostState,
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
let hashed_state = hashed_state.get();
@@ -1095,34 +1108,24 @@ where
// need to use the prefix sets which were generated from it to indicate to the
// ParallelStateRoot which parts of the trie need to be recomputed.
let prefix_sets = hashed_state.construct_prefix_sets().freeze();
let overlay_factory =
overlay_factory.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted());
let overlay_factory = OverlayStateProviderFactory::new(
provider_factory,
overlay_builder.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted()),
);
ParallelStateRoot::new(overlay_factory, prefix_sets, self.runtime.clone())
.incremental_root_with_updates()
}
/// Compute state root for the given hashed post state in serial.
///
/// Uses an overlay factory which provides the state of the parent block, along with the
/// [`HashedPostState`] containing the changes of this block, to compute the state root and
/// trie updates for this block.
/// Uses the same provider construction path as main execution and computes the state root and
/// trie updates for this block directly via
/// [`reth_provider::StateRootProvider::state_root_with_updates`].
fn compute_state_root_serial(
overlay_factory: OverlayStateProviderFactory<P>,
state_provider: StateProviderBox,
hashed_state: &LazyHashedPostState,
) -> ProviderResult<(B256, TrieUpdates)> {
let hashed_state = hashed_state.get();
// The `hashed_state` argument will be taken into account as part of the overlay, but we
// need to use the prefix sets which were generated from it to indicate to the
// StateRoot which parts of the trie need to be recomputed.
let prefix_sets = hashed_state.construct_prefix_sets().freeze();
let overlay_factory =
overlay_factory.with_extended_hashed_state_overlay(hashed_state.clone_into_sorted());
let provider = overlay_factory.database_provider_ro()?;
Ok(StateRoot::new(&provider, &provider)
.with_prefix_sets(prefix_sets)
.root_with_updates()?)
state_provider.state_root_with_updates(hashed_state.get().clone())
}
/// Awaits the state root from the background task, with an optional timeout fallback.
@@ -1147,7 +1150,7 @@ where
fn await_state_root_with_timeout<Tx, Err, R: Send + Sync + 'static>(
&self,
handle: &mut PayloadHandle<Tx, Err, R>,
overlay_factory: OverlayStateProviderFactory<P>,
state_provider_builder: StateProviderBuilder<N, P>,
hashed_state: &LazyHashedPostState,
) -> ProviderResult<Result<StateRootComputeOutcome, ParallelStateRootError>> {
let Some(timeout) = self.config.state_root_task_timeout() else {
@@ -1172,10 +1175,11 @@ where
let (seq_tx, seq_rx) =
std::sync::mpsc::channel::<ProviderResult<(B256, TrieUpdates)>>();
let seq_overlay = overlay_factory;
let seq_hashed_state = hashed_state.clone();
self.payload_processor.executor().spawn_blocking_named("serial-root", move || {
let result = Self::compute_state_root_serial(seq_overlay, &seq_hashed_state);
let result = state_provider_builder.build().and_then(|provider| {
Self::compute_state_root_serial(provider, &seq_hashed_state)
});
let _ = seq_tx.send(result);
});
@@ -1239,13 +1243,18 @@ where
/// updates.
fn compare_trie_updates_with_serial(
&self,
overlay_factory: OverlayStateProviderFactory<P>,
state_provider_builder: StateProviderBuilder<N, P>,
provider_factory: P,
overlay_builder: OverlayBuilder,
hashed_state: &LazyHashedPostState,
task_trie_updates: TrieUpdates,
) -> bool {
debug!(target: "engine::tree::payload_validator", "Comparing trie updates with serial computation");
match Self::compute_state_root_serial(overlay_factory.clone(), hashed_state) {
match state_provider_builder
.build()
.and_then(|provider| Self::compute_state_root_serial(provider, hashed_state))
{
Ok((serial_root, serial_trie_updates)) => {
debug!(
target: "engine::tree::payload_validator",
@@ -1254,6 +1263,8 @@ where
);
// Get a database provider to use as trie cursor factory
let overlay_factory =
OverlayStateProviderFactory::new(provider_factory, overlay_builder);
match overlay_factory.database_provider_ro() {
Ok(provider) => {
match super::trie_updates::compare_trie_updates(
@@ -1439,7 +1450,6 @@ where
provider_builder: StateProviderBuilder<N, P>,
overlay_factory: OverlayStateProviderFactory<P>,
strategy: StateRootStrategy,
block_access_list: Option<Arc<BlockAccessList>>,
) -> Result<
PayloadHandle<
impl ExecutableTxFor<Evm> + use<N, P, Evm, V, T>,
@@ -1459,7 +1469,6 @@ where
provider_builder,
overlay_factory,
&self.config,
block_access_list,
);
// record prewarming initialization duration
@@ -1472,12 +1481,8 @@ where
}
StateRootStrategy::Parallel | StateRootStrategy::Synchronous => {
let start = Instant::now();
let handle = self.payload_processor.spawn_cache_exclusive(
env,
txs,
provider_builder,
block_access_list,
);
let handle =
self.payload_processor.spawn_cache_exclusive(env, txs, provider_builder);
// Record prewarming initialization duration
self.metrics
@@ -2026,10 +2031,12 @@ where
state: &EngineApiTreeState<N>,
) -> Option<StateRootHandle> {
let (lazy_overlay, anchor_hash) = Self::get_parent_lazy_overlay(parent_hash, state);
let overlay_factory =
OverlayStateProviderFactory::new(self.provider.clone(), self.changeset_cache.clone())
let overlay_factory = OverlayStateProviderFactory::new(
self.provider.clone(),
OverlayBuilder::new(self.changeset_cache.clone())
.with_block_hash(Some(anchor_hash))
.with_lazy_overlay(lazy_overlay);
.with_lazy_overlay(lazy_overlay),
);
Some(self.payload_processor.spawn_state_root(
overlay_factory,
@@ -2110,6 +2117,17 @@ impl<T: PayloadTypes> BlockOrPayload<T> {
}
}
/// Returns the decoded block access list, if present and successfully decoded.
pub fn try_decoded_access_list(&self) -> Result<Option<DecodedBal>, alloy_rlp::Error> {
match self {
Self::Payload(payload) => payload
.block_access_list()
.map(|block_access_list| DecodedBal::from_rlp_bytes(block_access_list.clone()))
.transpose(),
Self::Block(_) => Ok(None),
}
}
/// Returns the number of transactions in the payload or block.
pub fn transaction_count(&self) -> usize
where

View File

@@ -184,11 +184,18 @@ impl TestHarness {
let payload_validator = MockEngineValidator;
let (from_tree_tx, from_tree_rx) = unbounded_channel();
let tree_config =
TreeConfig::default().with_legacy_state_root(false).with_has_enough_parallelism(true);
let header = chain_spec.genesis_header().clone();
let header = SealedHeader::seal_slow(header);
let engine_api_tree_state =
EngineApiTreeState::new(10, 10, header.num_hash(), EngineApiKind::Ethereum);
let engine_api_tree_state = EngineApiTreeState::new(
10,
10,
tree_config.invalid_header_hit_eviction_threshold(),
header.num_hash(),
EngineApiKind::Ethereum,
);
let canonical_in_memory_state = CanonicalInMemoryState::with_head(header, None, None);
let (to_payload_service, _payload_command_rx) = unbounded_channel();
@@ -217,8 +224,7 @@ impl TestHarness {
persistence_handle,
PersistenceState { last_persisted_block: BlockNumHash::default(), rx: None },
payload_builder,
// always assume enough parallelism for tests
TreeConfig::default().with_legacy_state_root(false).with_has_enough_parallelism(true),
tree_config,
EngineApiKind::Ethereum,
evm_config,
changeset_cache,

View File

@@ -2,8 +2,8 @@
//! and injecting them into era1 files with `Era1Writer`.
use crate::calculate_td_by_number;
use alloy_consensus::BlockHeader;
use alloy_primitives::{BlockNumber, B256, U256};
use alloy_consensus::{BlockHeader, Sealable, TxReceipt};
use alloy_primitives::{BlockNumber, U256};
use eyre::{eyre, Result};
use reth_era::{
common::file_ops::{EraFileId, StreamWriter},
@@ -13,7 +13,7 @@ use reth_era::{
types::{
execution::{
Accumulator, BlockTuple, CompressedBody, CompressedHeader, CompressedReceipts,
TotalDifficulty, MAX_BLOCKS_PER_ERA1,
HeaderRecord, TotalDifficulty, MAX_BLOCKS_PER_ERA1,
},
group::{BlockIndex, Era1Id},
},
@@ -139,17 +139,21 @@ where
let headers = provider.headers_range(start_block..=end_block)?;
// Extract first 4 bytes of last block's state root as historical identifier
let historical_root = headers
.last()
.map(|header| {
let state_root = header.state_root();
[state_root[0], state_root[1], state_root[2], state_root[3]]
// Pre-compute accumulator from headers to determine filename
let mut precompute_td = total_difficulty;
let header_records: Vec<HeaderRecord> = headers
.iter()
.map(|h| {
precompute_td += h.difficulty();
HeaderRecord { block_hash: h.hash_slow(), total_difficulty: precompute_td }
})
.unwrap_or([0u8; 4]);
.collect();
let accumulator = Accumulator::from_header_records(&header_records)
.map_err(|e| eyre!("Failed to compute accumulator: {e}"))?;
let file_hash: [u8; 4] = accumulator.root[..4].try_into().unwrap();
let era1_id = Era1Id::new(&config.network, start_block, block_count as u32)
.with_hash(historical_root);
let era1_id =
Era1Id::new(&config.network, start_block, block_count as u32).with_hash(file_hash);
let era1_id = if config.max_blocks_per_file == MAX_BLOCKS_PER_ERA1 as u64 {
era1_id
@@ -166,7 +170,6 @@ where
let mut offsets = Vec::<i64>::with_capacity(block_count);
let mut position = VERSION_ENTRY_SIZE as i64;
let mut blocks_written = 0;
let mut final_header_data = Vec::new();
for (i, header) in headers.into_iter().enumerate() {
let expected_block_number = start_block + i as u64;
@@ -178,11 +181,6 @@ where
&mut total_difficulty,
)?;
// Save last block's header data for accumulator
if expected_block_number == end_block {
final_header_data = compressed_header.data.clone();
}
let difficulty = TotalDifficulty::new(total_difficulty);
let header_size = compressed_header.data.len() + ENTRY_HEADER_SIZE;
@@ -218,10 +216,12 @@ where
}
}
if blocks_written > 0 {
let accumulator_hash =
B256::from_slice(&final_header_data[0..32.min(final_header_data.len())]);
let accumulator = Accumulator::new(accumulator_hash);
let block_index = BlockIndex::new(start_block, offsets);
// Convert absolute offsets to relative (measured from block-index entry start)
let accumulator_entry_size = (ENTRY_HEADER_SIZE + 32) as i64;
let block_index_position = position + accumulator_entry_size;
let relative_offsets: Vec<i64> =
offsets.iter().map(|&abs| abs - block_index_position).collect();
let block_index = BlockIndex::new(start_block, relative_offsets);
writer.write_accumulator(&accumulator)?;
writer.write_block_index(&block_index)?;
@@ -310,7 +310,9 @@ where
let compressed_header = CompressedHeader::from_header(&header)?;
let compressed_body = CompressedBody::from_body(&body)?;
let compressed_receipts = CompressedReceipts::from_encodable_list(&receipts)
let receipts_with_bloom: Vec<_> =
receipts.iter().map(|r| TxReceipt::with_bloom_ref(r)).collect();
let compressed_receipts = CompressedReceipts::from_encodable_list(&receipts_with_bloom)
.map_err(|e| eyre!("Failed to compress receipts: {}", e))?;
Ok((compressed_header, compressed_body, compressed_receipts))

View File

@@ -24,6 +24,7 @@ snap.workspace = true
# ssz encoding and decoding
ethereum_ssz.workspace = true
ethereum_ssz_derive.workspace = true
sha2.workspace = true
[dev-dependencies]
eyre.workspace = true

View File

@@ -76,6 +76,7 @@ use crate::{
use alloy_consensus::{Block, BlockBody, Header};
use alloy_primitives::{B256, U256};
use alloy_rlp::{Decodable, Encodable};
use sha2::{Digest, Sha256};
use snap::{read::FrameDecoder, write::FrameEncoder};
use std::{
io::{Read, Write},
@@ -493,6 +494,73 @@ impl Accumulator {
Ok(Self { root: B256::from(root) })
}
/// Compute the accumulator from a list of header records.
///
/// Implements `hash_tree_root(List[HeaderRecord, 8192])` per the ERA1 spec:
/// - Each leaf is `sha256(block_hash || total_difficulty_le_bytes32)`
/// - Leaves are padded to `MAX_BLOCKS_PER_ERA1` (8192) with zero hashes
/// - Binary Merkle tree is computed bottom-up
/// - Final root is `sha256(merkle_root || le_bytes32(actual_count))`
///
/// Returns `Err` if `records` exceeds [`MAX_BLOCKS_PER_ERA1`].
pub fn from_header_records(records: &[HeaderRecord]) -> Result<Self, E2sError> {
let capacity = MAX_BLOCKS_PER_ERA1;
if records.len() > capacity {
return Err(E2sError::Ssz(format!(
"Too many header records: got {}, max {}",
records.len(),
capacity
)));
}
// Compute leaf hash for each header record
let mut leaves = Vec::with_capacity(capacity);
for record in records {
let mut data = [0u8; 64];
data[..32].copy_from_slice(record.block_hash.as_slice());
data[32..].copy_from_slice(&record.total_difficulty.to_le_bytes::<32>());
leaves.push(<[u8; 32]>::from(Sha256::digest(data)));
}
// Pad to capacity with zero hashes
leaves.resize(capacity, [0u8; 32]);
// Binary Merkle tree bottom-up (capacity is always a power of two)
while leaves.len() > 1 {
let mut next_level = Vec::with_capacity(leaves.len() / 2);
for pair in leaves.chunks_exact(2) {
let mut data = [0u8; 64];
data[..32].copy_from_slice(&pair[0]);
data[32..].copy_from_slice(&pair[1]);
next_level.push(<[u8; 32]>::from(Sha256::digest(data)));
}
leaves = next_level;
}
let merkle_root = leaves[0];
// mix_in_length: sha256(merkle_root || le_bytes32(actual_length))
let mut mix = [0u8; 64];
mix[..32].copy_from_slice(&merkle_root);
let length = records.len() as u64;
mix[32..40].copy_from_slice(&length.to_le_bytes());
// remaining bytes stay zero (uint256 LE padding)
Ok(Self { root: B256::from(<[u8; 32]>::from(Sha256::digest(mix))) })
}
}
/// A header record used to compute the ERA1 accumulator.
///
/// Per the ERA1 spec: `header-record := { block-hash: Bytes32, total-difficulty: Uint256 }`
#[derive(Debug, Clone)]
pub struct HeaderRecord {
/// The block hash (keccak256 of RLP-encoded header)
pub block_hash: B256,
/// The cumulative total difficulty at this block
pub total_difficulty: U256,
}
/// A block tuple in an Era1 file, containing all components for a single block
@@ -691,6 +759,44 @@ mod tests {
}
}
#[test]
fn test_accumulator_from_header_records_known_vectors() {
// Known-answer vectors computed from the SSZ spec:
// hash_tree_root(List[HeaderRecord, 8192])
let expected_empty: B256 =
"4a8c3a07c8d23adc5bac61157555c3c784d53d9bc110c1370809bd23cd93777d".parse().unwrap();
let expected_single_zero: B256 =
"81fd641249670887a731386e756a7a1538dc781b1b0bf016889045d350812817".parse().unwrap();
let expected_single_nonzero: B256 =
"ada35c48d81117f4fd588554cd4c4752356336e84cb41106dea1ceb4cfac8799".parse().unwrap();
// Empty list
let acc_empty = Accumulator::from_header_records(&[]).unwrap();
assert_eq!(acc_empty.root, expected_empty);
// Single record with zero values
let records = vec![HeaderRecord { block_hash: B256::ZERO, total_difficulty: U256::ZERO }];
let acc = Accumulator::from_header_records(&records).unwrap();
assert_eq!(acc.root, expected_single_zero);
// Single record with non-zero values
let records2 = vec![HeaderRecord {
block_hash: B256::from([1u8; 32]),
total_difficulty: U256::from(100u64),
}];
let acc2 = Accumulator::from_header_records(&records2).unwrap();
assert_eq!(acc2.root, expected_single_nonzero);
}
#[test]
fn test_accumulator_rejects_oversized_input() {
let records = vec![
HeaderRecord { block_hash: B256::ZERO, total_difficulty: U256::ZERO };
MAX_BLOCKS_PER_ERA1 + 1
];
assert!(Accumulator::from_header_records(&records).is_err());
}
#[test]
fn test_receipt_list_compression() {
let receipts = create_test_receipts();

View File

@@ -102,8 +102,8 @@ pub struct Era1Id {
/// Number of blocks in the file
pub block_count: u32,
/// Optional hash identifier for this file
/// First 4 bytes of the last historical root in the last state in the era file
/// Optional hash identifier for this file.
/// First 4 bytes of the accumulator root hash.
pub hash: Option<[u8; 4]>,
/// Whether to include era count in filename

View File

@@ -48,7 +48,7 @@ tokio.workspace = true
# revm with required ethereum features
# Note: this must be kept to ensure all features are properly enabled/forwarded
revm = { workspace = true, features = ["secp256k1", "blst", "c-kzg", "memory_limit"] }
revm = { workspace = true, features = ["secp256k1", "blst", "c-kzg", "memory_limit", "p256-aws-lc-rs"] }
# misc
eyre.workspace = true

View File

@@ -225,7 +225,7 @@ impl Discv5 {
bootstrap_lookup_interval,
bootstrap_lookup_countdown,
metrics.clone(),
discv5.clone(),
Arc::downgrade(&discv5),
);
Ok((
@@ -573,14 +573,19 @@ pub fn spawn_populate_kbuckets_bg(
bootstrap_lookup_interval: u64,
bootstrap_lookup_countdown: u64,
metrics: Discv5Metrics,
discv5: Arc<discv5::Discv5>,
discv5: std::sync::Weak<discv5::Discv5>,
) {
let local_node_id = discv5.local_enr().node_id();
let lookup_interval = Duration::from_secs(lookup_interval);
let metrics = metrics.discovered_peers;
let mut kbucket_index = MAX_KBUCKET_INDEX;
let pulse_lookup_interval = Duration::from_secs(bootstrap_lookup_interval);
task::spawn(async move {
let Some(discv5_handle) = discv5.upgrade() else {
return;
};
let local_node_id = discv5_handle.local_enr().node_id();
drop(discv5_handle);
// make many fast lookup queries at bootstrap, trying to fill kbuckets at furthest
// log2distance from local node
for i in (0..bootstrap_lookup_countdown).rev() {
@@ -593,7 +598,12 @@ pub fn spawn_populate_kbuckets_bg(
"starting bootstrap boost lookup query"
);
lookup(target, &discv5, &metrics).await;
{
let Some(discv5_handle) = discv5.upgrade() else {
return;
};
lookup(target, &discv5_handle, &metrics).await;
}
tokio::time::sleep(pulse_lookup_interval).await;
}
@@ -610,7 +620,12 @@ pub fn spawn_populate_kbuckets_bg(
"starting periodic lookup query"
);
lookup(target, &discv5, &metrics).await;
{
let Some(discv5_handle) = discv5.upgrade() else {
return;
};
lookup(target, &discv5_handle, &metrics).await;
}
if kbucket_index > DEFAULT_MIN_TARGET_KBUCKET_INDEX {
// try to populate bucket one step closer
@@ -698,6 +713,10 @@ mod test {
use ::enr::{CombinedKey, EnrKey};
use rand_08::thread_rng;
use reth_chainspec::MAINNET;
use std::{
net::UdpSocket,
time::{Duration, Instant},
};
use tracing::trace;
fn discv5_noop() -> Discv5 {
@@ -736,6 +755,61 @@ mod test {
Discv5::start(&secret_key, discv5_config).await.expect("should build discv5")
}
async fn start_discovery_node_with_key(
secret_key: &SecretKey,
udp_port_discv5: u16,
) -> Result<(Discv5, mpsc::Receiver<discv5::Event>), Error> {
let discv5_addr: SocketAddr = format!("127.0.0.1:{udp_port_discv5}").parse().unwrap();
let rlpx_addr: SocketAddr = "127.0.0.1:30303".parse().unwrap();
let discv5_listen_config = ListenConfig::from(discv5_addr);
let discv5_config = Config::builder(rlpx_addr)
.discv5_config(discv5::ConfigBuilder::new(discv5_listen_config).build())
.build();
Discv5::start(secret_key, discv5_config).await
}
fn unused_udp_port() -> u16 {
UdpSocket::bind("127.0.0.1:0").unwrap().local_addr().unwrap().port()
}
async fn wait_for_udp_port_release(port: u16, timeout: Duration) {
let deadline = Instant::now() + timeout;
loop {
match UdpSocket::bind(("127.0.0.1", port)) {
Ok(socket) => {
drop(socket);
return;
}
Err(err) if Instant::now() < deadline => {
trace!(target: "net::discv5::test", %port, %err, "waiting for discv5 port release");
tokio::time::sleep(Duration::from_millis(10)).await;
}
Err(err) => panic!("discv5 did not release port {port} before timeout: {err}"),
}
}
}
#[tokio::test(flavor = "multi_thread")]
async fn discv5_releases_port_on_drop() {
reth_tracing::init_test_tracing();
let secret_key = SecretKey::new(&mut thread_rng());
let port = unused_udp_port();
let (node, updates) =
start_discovery_node_with_key(&secret_key, port).await.expect("should start discv5");
drop(updates);
drop(node);
wait_for_udp_port_release(port, Duration::from_secs(1)).await;
let restarted = start_discovery_node_with_key(&secret_key, port).await;
assert!(restarted.is_ok(), "discv5 failed to rebind dropped port: {restarted:?}");
}
#[tokio::test(flavor = "multi_thread")]
async fn discv5() {
reth_tracing::init_test_tracing();

View File

@@ -7,7 +7,7 @@
use crate::{
errors::{EthHandshakeError, EthStreamError},
handshake::EthereumEthHandshake,
message::{EthBroadcastMessage, ProtocolBroadcastMessage, MAX_MESSAGE_SIZE},
message::{EthBroadcastMessage, EthMessageID, ProtocolBroadcastMessage, MAX_MESSAGE_SIZE},
p2pstream::HANDSHAKE_TIMEOUT,
CanDisconnect, DisconnectReason, EthMessage, EthNetworkPrimitives, EthVersion, ProtocolMessage,
UnifiedStatus,
@@ -108,6 +108,9 @@ pub struct EthStreamInner<N> {
version: EthVersion,
/// Maximum allowed ETH message size.
max_message_size: usize,
/// When true, `NewBlock` (0x07) and `NewBlockHashes` (0x01) messages are rejected before RLP
/// decoding to avoid any memory impact for non-PoW networks.
reject_block_announcements: bool,
_pd: std::marker::PhantomData<N>,
}
@@ -122,7 +125,12 @@ where
/// Creates a new [`EthStreamInner`] with the given eth version and message size limit.
pub const fn with_max_message_size(version: EthVersion, max_message_size: usize) -> Self {
Self { version, max_message_size, _pd: std::marker::PhantomData }
Self {
version,
max_message_size,
reject_block_announcements: false,
_pd: std::marker::PhantomData,
}
}
/// Returns the eth version
@@ -131,12 +139,25 @@ where
self.version
}
/// Sets whether to reject block announcement messages (`NewBlock`, `NewBlockHashes`) before
/// RLP decoding.
pub const fn set_reject_block_announcements(&mut self, reject: bool) {
self.reject_block_announcements = reject;
}
/// Decodes incoming bytes into an [`EthMessage`].
pub fn decode_message(&self, bytes: BytesMut) -> Result<EthMessage<N>, EthStreamError> {
if bytes.len() > self.max_message_size {
return Err(EthStreamError::MessageTooBig(bytes.len()));
}
if self.reject_block_announcements &&
let Some(&id) = bytes.first() &&
(id == EthMessageID::NewBlock.to_u8() || id == EthMessageID::NewBlockHashes.to_u8())
{
return Err(EthStreamError::UnsupportedMessage { message_id: id });
}
let msg = match ProtocolMessage::decode_message(self.version, &mut bytes.as_ref()) {
Ok(m) => m,
Err(err) => {
@@ -208,6 +229,12 @@ impl<S, N: NetworkPrimitives> EthStream<S, N> {
self.eth.version()
}
/// Sets whether to reject block announcement messages (`NewBlock`, `NewBlockHashes`) before
/// RLP decoding.
pub const fn set_reject_block_announcements(&mut self, reject: bool) {
self.eth.set_reject_block_announcements(reject);
}
/// Returns the underlying stream.
#[inline]
pub const fn inner(&self) -> &S {

View File

@@ -13,6 +13,7 @@ use crate::{
};
use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives};
use reth_network_api::test_utils::PeersHandleProvider;
use reth_storage_api::BalProvider;
use reth_transaction_pool::TransactionPool;
use tokio::sync::mpsc;
@@ -63,7 +64,10 @@ impl<Tx, Eth, N: NetworkPrimitives> NetworkBuilder<Tx, Eth, N> {
pub fn request_handler<Client>(
self,
client: Client,
) -> NetworkBuilder<Tx, EthRequestHandler<Client, N>, N> {
) -> NetworkBuilder<Tx, EthRequestHandler<Client, N>, N>
where
Client: BalProvider,
{
let Self { mut network, transactions, .. } = self;
let (tx, rx) = mpsc::channel(ETH_REQUEST_CHANNEL_CAPACITY);
network.set_eth_request_handler(tx);

View File

@@ -20,7 +20,9 @@ use reth_eth_wire_types::message::MAX_MESSAGE_SIZE;
use reth_ethereum_forks::{ForkFilter, Head};
use reth_network_peers::{mainnet_nodes, pk2id, sepolia_nodes, PeerId, TrustedPeer};
use reth_network_types::{PeersConfig, SessionsConfig};
use reth_storage_api::{noop::NoopProvider, BlockNumReader, BlockReader, HeaderProvider};
use reth_storage_api::{
noop::NoopProvider, BalProvider, BlockNumReader, BlockReader, HeaderProvider,
};
use reth_tasks::Runtime;
use secp256k1::SECP256K1;
use std::{collections::HashSet, net::SocketAddr, sync::Arc};
@@ -157,7 +159,8 @@ where
impl<C, N> NetworkConfig<C, N>
where
N: NetworkPrimitives,
C: BlockReader<Block = N::Block, Receipt = N::Receipt, Header = N::BlockHeader>
C: BalProvider
+ BlockReader<Block = N::Block, Receipt = N::Receipt, Header = N::BlockHeader>
+ HeaderProvider
+ Clone
+ Unpin

View File

@@ -18,7 +18,7 @@ use reth_network_api::test_utils::PeersHandle;
use reth_network_p2p::error::RequestResult;
use reth_network_peers::PeerId;
use reth_primitives_traits::Block;
use reth_storage_api::{BlockReader, HeaderProvider};
use reth_storage_api::{BalProvider, BlockReader, GetBlockAccessListLimit, HeaderProvider};
use std::{
future::Future,
pin::Pin,
@@ -282,27 +282,6 @@ where
let _ = response.send(Ok(Receipts70 { last_block_incomplete, receipts }));
}
/// Handles [`GetBlockAccessLists`] queries.
///
/// EIP-8159 defines the final `BlockAccessLists` response semantics:
/// <https://eips.ethereum.org/EIPS/eip-8159>
fn on_block_access_lists_request(
&self,
_peer_id: PeerId,
request: GetBlockAccessLists,
response: oneshot::Sender<RequestResult<BlockAccessLists>>,
) {
// TODO: BAL serving is not fully implemented yet. Per EIP-8159, unavailable BALs are
// returned as empty BAL entries while preserving request order, so we currently return
// one RLP-encoded empty BAL (`0xc0`) per requested hash.
let access_lists = request
.0
.into_iter()
.map(|_| Bytes::from_static(&[alloy_rlp::EMPTY_LIST_CODE]))
.collect();
let _ = response.send(Ok(BlockAccessLists(access_lists)));
}
#[inline]
fn get_receipts_response<T, F>(&self, request: GetReceipts, transform_fn: F) -> Vec<Vec<T>>
where
@@ -332,13 +311,55 @@ where
}
}
impl<C, N> EthRequestHandler<C, N>
where
N: NetworkPrimitives,
C: BalProvider,
{
/// Handles [`GetBlockAccessLists`] queries.
///
/// EIP-8159 defines the final `BlockAccessLists` response semantics:
/// <https://eips.ethereum.org/EIPS/eip-8159>
fn on_block_access_lists_request(
&self,
_peer_id: PeerId,
request: GetBlockAccessLists,
response: oneshot::Sender<RequestResult<BlockAccessLists>>,
) {
let limit = GetBlockAccessListLimit::ResponseSizeSoftLimit(SOFT_RESPONSE_LIMIT);
let access_lists = self
.client
.bal_store()
.get_by_hashes_with_limit(&request.0, limit)
.unwrap_or_else(|_| empty_block_access_lists_with_limit(request.0.len(), limit));
let _ = response.send(Ok(BlockAccessLists(access_lists)));
}
}
/// Builds the error fallback response while still enforcing the BAL response soft limit.
fn empty_block_access_lists_with_limit(count: usize, limit: GetBlockAccessListLimit) -> Vec<Bytes> {
let mut out = Vec::with_capacity(count);
let mut size = 0;
for _ in 0..count {
let bal = Bytes::from_static(&[0xc0]);
size += bal.len();
out.push(bal);
if limit.exceeds(size) {
break
}
}
out
}
/// An endless future.
///
/// This should be spawned or used as part of `tokio::select!`.
impl<C, N> Future for EthRequestHandler<C, N>
where
N: NetworkPrimitives,
C: BlockReader<Block = N::Block, Receipt = N::Receipt>
C: BalProvider
+ BlockReader<Block = N::Block, Receipt = N::Receipt>
+ HeaderProvider<Header = N::BlockHeader>
+ Unpin,
{

View File

@@ -6,7 +6,7 @@ use futures::{future, future::Either};
use reth_eth_wire::{BlockAccessLists, EthNetworkPrimitives, NetworkPrimitives};
use reth_network_api::test_utils::PeersHandle;
use reth_network_p2p::{
block_access_lists::client::BlockAccessListsClient,
block_access_lists::client::{BalRequirement, BlockAccessListsClient},
bodies::client::{BodiesClient, BodiesFut},
download::DownloadClient,
error::{PeerRequestResult, RequestError},
@@ -135,11 +135,29 @@ impl<N: NetworkPrimitives> BlockAccessListsClient for FetchClient<N> {
&self,
hashes: Vec<B256>,
priority: Priority,
) -> Self::Output {
self.get_block_access_lists_with_priority_and_requirement(
hashes,
priority,
BalRequirement::Mandatory,
)
}
fn get_block_access_lists_with_priority_and_requirement(
&self,
hashes: Vec<B256>,
priority: Priority,
requirement: BalRequirement,
) -> Self::Output {
let (response, rx) = oneshot::channel();
if self
.request_tx
.send(DownloadRequest::GetBlockAccessLists { request: hashes, response, priority })
.send(DownloadRequest::GetBlockAccessLists {
request: hashes,
response,
priority,
requirement,
})
.is_ok()
{
Box::pin(FlattenedResponse::from(rx))

View File

@@ -13,6 +13,7 @@ use reth_eth_wire::{
};
use reth_network_api::test_utils::PeersHandle;
use reth_network_p2p::{
block_access_lists::client::BalRequirement,
error::{EthResponseValidator, PeerRequestResult, RequestError, RequestResult},
headers::client::HeadersRequest,
priority::Priority,
@@ -159,15 +160,10 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
/// full history available
fn next_best_peer(&self, requirement: BestPeerRequirements) -> Option<PeerId> {
// filter out peers that aren't idle or don't meet the requirement
let mut idle = self.peers.iter().filter(|(_, peer)| {
peer.state.is_idle() &&
match &requirement {
BestPeerRequirements::EthVersion(ver) => {
peer.capabilities.supports_eth_at_least(ver)
}
_ => true,
}
});
let mut idle = self
.peers
.iter()
.filter(|(_, peer)| peer.state.is_idle() && peer.satisfies(&requirement));
let mut best_peer = idle.next()?;
@@ -195,6 +191,14 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
Some(*best_peer.0)
}
/// Returns whether any connected peer can serve BAL requests.
fn has_eth71_peer(&self) -> bool {
self.peers.values().any(|peer| {
!matches!(peer.state, PeerState::Closing) &&
peer.capabilities.supports_eth_at_least(&EthVersion::Eth71)
})
}
/// Returns the next action to return
fn poll_action(&mut self) -> PollAction {
// we only check and not pop here since we don't know yet whether a peer is available.
@@ -208,9 +212,15 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
let request = self.queued_requests.pop_front().expect("not empty");
let Some(peer_id) = self.next_best_peer(request.best_peer_requirements()) else {
// no peer matches this request's requirements; requeue at the back so other
// queued requests get a chance on the next poll instead of head-of-line blocking.
self.queued_requests.push_back(request);
// Optional BAL requests can lose their eth/71 peer while queued; complete them
// instead of waiting for future peer churn.
if request.is_optional_bal() && !self.has_eth71_peer() {
request.send_err_response(RequestError::UnsupportedCapability);
} else {
// no peer matches this request's requirements; requeue at the back so other
// queued requests get a chance on the next poll instead of head-of-line blocking.
self.queued_requests.push_back(request);
}
return PollAction::NoPeersAvailable
};
@@ -232,21 +242,30 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
loop {
// poll incoming requests
match self.download_requests_rx.poll_next_unpin(cx) {
Poll::Ready(Some(request)) => match request.get_priority() {
Priority::High => {
// find the first normal request and queue before, add this request to
// the back of the high-priority queue
let pos = self
.queued_requests
.iter()
.position(|req| req.is_normal_priority())
.unwrap_or(0);
self.queued_requests.insert(pos, request);
Poll::Ready(Some(request)) => {
// Optional BAL requests should not wait for future peer churn if no
// connected peer can serve them right now.
if request.is_optional_bal() && !self.has_eth71_peer() {
request.send_err_response(RequestError::UnsupportedCapability);
continue
}
Priority::Normal => {
self.queued_requests.push_back(request);
match request.get_priority() {
Priority::High => {
// find first normal request and queue before it; add this request
// to the back of the high-priority queue
let pos = self
.queued_requests
.iter()
.position(|req| req.is_normal_priority())
.unwrap_or(0);
self.queued_requests.insert(pos, request);
}
Priority::Normal => {
self.queued_requests.push_back(request);
}
}
},
}
Poll::Ready(None) => {
unreachable!("channel can't close")
}
@@ -269,6 +288,15 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
peer.state = req.peer_state();
}
self.prepare_inflight_block_request(peer_id, req)
}
/// Tracks an inflight request and converts it into a peer request.
fn prepare_inflight_block_request(
&mut self,
peer_id: PeerId,
req: DownloadRequest<N>,
) -> BlockRequest {
match req {
DownloadRequest::GetBlockHeaders { request, response, .. } => {
let inflight = Request { request: request.clone(), response };
@@ -299,12 +327,23 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
}
}
/// Returns a new followup request for the peer.
/// Returns a queued followup request the peer can serve.
///
/// This is an immediate scheduling shortcut after a successful response. It skips queued
/// requests whose hard requirements do not match this peer, leaving them for the regular peer
/// selection path.
///
/// Caution: this expects that the peer is _not_ closed.
fn followup_request(&mut self, peer_id: PeerId) -> Option<BlockResponseOutcome> {
let req = self.queued_requests.pop_front()?;
let req = self.prepare_block_request(peer_id, req);
let peer = self.peers.get_mut(&peer_id)?;
let req_idx = self.queued_requests.iter().position(|req| {
// Find the first queued request this peer can serve.
peer.satisfies(&req.best_peer_requirements())
})?;
let req = self.queued_requests.remove(req_idx).expect("valid request index");
peer.state = req.peer_state();
let req = self.prepare_inflight_block_request(peer_id, req);
Some(BlockResponseOutcome::Request(peer_id, req))
}
@@ -476,6 +515,16 @@ impl Peer {
self.range_info.as_ref().map(|info| info.range())
}
/// Returns whether this peer can serve requests with the given hard requirements.
fn satisfies(&self, requirement: &BestPeerRequirements) -> bool {
match requirement {
BestPeerRequirements::EthVersion(ver) => self.capabilities.supports_eth_at_least(ver),
BestPeerRequirements::None |
BestPeerRequirements::FullBlock |
BestPeerRequirements::FullBlockRange(_) => true,
}
}
/// Returns true if this peer has a better range than the other peer for serving the requested
/// range.
///
@@ -602,6 +651,7 @@ pub(crate) enum DownloadRequest<N: NetworkPrimitives> {
request: Vec<B256>,
response: oneshot::Sender<PeerRequestResult<BlockAccessLists>>,
priority: Priority,
requirement: BalRequirement,
},
/// Download receipts for the given block hashes and send response through channel
GetReceipts {
@@ -639,6 +689,21 @@ impl<N: NetworkPrimitives> DownloadRequest<N> {
self.get_priority().is_normal()
}
/// Returns `true` if this is an optional BAL request.
const fn is_optional_bal(&self) -> bool {
matches!(self, Self::GetBlockAccessLists { requirement: BalRequirement::Optional, .. })
}
/// Sends an error response to the waiting caller.
fn send_err_response(self, err: RequestError) {
let _ = match self {
Self::GetBlockHeaders { response, .. } => response.send(Err(err)).ok(),
Self::GetBlockBodies { response, .. } => response.send(Err(err)).ok(),
Self::GetBlockAccessLists { response, .. } => response.send(Err(err)).ok(),
Self::GetReceipts { response, .. } => response.send(Err(err)).ok(),
};
}
/// Returns the best peer requirements for this request.
fn best_peer_requirements(&self) -> BestPeerRequirements {
match self {
@@ -1404,6 +1469,98 @@ mod tests {
assert!(matches!(outcome, Some(BlockResponseOutcome::Request(pid, _)) if pid == peer_id));
}
#[tokio::test]
async fn test_followup_skips_request_peer_cannot_serve() {
let (mut fetcher, peer_id) = fetcher_with_peer();
let peer_71 = B512::random();
let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)]));
fetcher.new_active_peer(
peer_71,
B256::random(),
100,
caps_71,
Arc::new(AtomicU64::new(10)),
None,
);
fetcher.peers.get_mut(&peer_71).expect("peer exists").state = PeerState::GetBlockHeaders;
let (followup_tx, _followup_rx) = oneshot::channel();
fetcher.queued_requests.push_back(DownloadRequest::GetBlockAccessLists {
request: vec![B256::random()],
response: followup_tx,
priority: Priority::Normal,
requirement: BalRequirement::Optional,
});
let _rx = insert_inflight_receipts(&mut fetcher, peer_id);
let resp = ReceiptsResponse::new(vec![vec![]]);
assert!(fetcher.on_receipts_response(peer_id, Ok(resp)).is_none());
assert!(fetcher.peers[&peer_id].state.is_idle());
assert!(!fetcher.inflight_bals_requests.contains_key(&peer_id));
assert!(matches!(
fetcher.queued_requests.front(),
Some(DownloadRequest::GetBlockAccessLists {
requirement: BalRequirement::Optional,
..
})
));
}
#[tokio::test]
async fn test_followup_uses_first_satisfiable_request() {
let (mut fetcher, peer_id) = fetcher_with_peer();
let peer_71 = B512::random();
let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)]));
fetcher.new_active_peer(
peer_71,
B256::random(),
100,
caps_71,
Arc::new(AtomicU64::new(10)),
None,
);
fetcher.peers.get_mut(&peer_71).expect("peer exists").state = PeerState::GetBlockHeaders;
let (bal_tx, _bal_rx) = oneshot::channel();
fetcher.queued_requests.push_back(DownloadRequest::GetBlockAccessLists {
request: vec![B256::random()],
response: bal_tx,
priority: Priority::Normal,
requirement: BalRequirement::Optional,
});
let (bodies_tx, _bodies_rx) = oneshot::channel();
fetcher.queued_requests.push_back(DownloadRequest::GetBlockBodies {
request: vec![B256::random()],
response: bodies_tx,
priority: Priority::Normal,
range_hint: None,
});
let _rx = insert_inflight_receipts(&mut fetcher, peer_id);
let resp = ReceiptsResponse::new(vec![vec![]]);
let outcome = fetcher.on_receipts_response(peer_id, Ok(resp));
assert!(matches!(
outcome,
Some(BlockResponseOutcome::Request(pid, BlockRequest::GetBlockBodies(_))) if pid == peer_id
));
assert!(fetcher.inflight_bodies_requests.contains_key(&peer_id));
assert!(matches!(fetcher.peers[&peer_id].state, PeerState::GetBlockBodies));
assert_eq!(fetcher.queued_requests.len(), 1);
assert!(matches!(
fetcher.queued_requests.front(),
Some(DownloadRequest::GetBlockAccessLists {
requirement: BalRequirement::Optional,
..
})
));
}
#[tokio::test]
async fn test_prepare_block_request_creates_inflight_receipts() {
let (mut fetcher, peer_id) = fetcher_with_peer();
@@ -1541,6 +1698,7 @@ mod tests {
request: vec![],
response: tx,
priority: Priority::Normal,
requirement: BalRequirement::Mandatory,
});
let waker = noop_waker();
@@ -1583,4 +1741,138 @@ mod tests {
assert_eq!(peer_id, peer_71);
}
}
#[tokio::test]
async fn test_optional_bal_request_rejected_without_eth71_peer() {
use futures::task::noop_waker;
use std::task::{Context, Poll};
let manager = PeersManager::new(PeersConfig::default());
let mut fetcher =
StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
let peer_old = B512::random();
let caps_old = Arc::new(Capabilities::new(vec![]));
fetcher.new_active_peer(
peer_old,
B256::random(),
100,
caps_old,
Arc::new(AtomicU64::new(10)),
None,
);
let (tx, rx) = oneshot::channel();
fetcher
.download_requests_tx
.send(DownloadRequest::GetBlockAccessLists {
request: vec![],
response: tx,
priority: Priority::Normal,
requirement: BalRequirement::Optional,
})
.unwrap();
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
assert!(matches!(fetcher.poll(&mut cx), Poll::Pending));
assert!(fetcher.queued_requests.is_empty());
assert_eq!(rx.await.unwrap().unwrap_err(), RequestError::UnsupportedCapability);
}
#[tokio::test]
async fn test_optional_bal_request_waits_for_busy_eth71_peer() {
use futures::task::noop_waker;
use std::task::{Context, Poll};
let manager = PeersManager::new(PeersConfig::default());
let mut fetcher =
StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
let peer_71 = B512::random();
let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)]));
fetcher.new_active_peer(
peer_71,
B256::random(),
100,
caps_71,
Arc::new(AtomicU64::new(10)),
None,
);
fetcher.peers.get_mut(&peer_71).expect("peer exists").state = PeerState::GetBlockHeaders;
let (tx, _rx) = oneshot::channel();
fetcher
.download_requests_tx
.send(DownloadRequest::GetBlockAccessLists {
request: vec![],
response: tx,
priority: Priority::Normal,
requirement: BalRequirement::Optional,
})
.unwrap();
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
assert!(matches!(fetcher.poll(&mut cx), Poll::Pending));
assert_eq!(fetcher.queued_requests.len(), 1);
}
#[tokio::test]
async fn test_queued_optional_bal_request_rejected_after_eth71_disconnect() {
use futures::task::noop_waker;
use std::task::{Context, Poll};
let manager = PeersManager::new(PeersConfig::default());
let mut fetcher =
StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
let peer_old = B512::random();
let caps_old = Arc::new(Capabilities::new(vec![]));
fetcher.new_active_peer(
peer_old,
B256::random(),
100,
caps_old,
Arc::new(AtomicU64::new(10)),
None,
);
let peer_71 = B512::random();
let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)]));
fetcher.new_active_peer(
peer_71,
B256::random(),
100,
caps_71,
Arc::new(AtomicU64::new(10)),
None,
);
fetcher.peers.get_mut(&peer_71).expect("peer exists").state = PeerState::GetBlockHeaders;
let (tx, rx) = oneshot::channel();
fetcher
.download_requests_tx
.send(DownloadRequest::GetBlockAccessLists {
request: vec![],
response: tx,
priority: Priority::Normal,
requirement: BalRequirement::Optional,
})
.unwrap();
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
assert!(matches!(fetcher.poll(&mut cx), Poll::Pending));
assert_eq!(fetcher.queued_requests.len(), 1);
fetcher.on_session_closed(&peer_71);
assert!(matches!(fetcher.poll(&mut cx), Poll::Pending));
assert!(fetcher.queued_requests.is_empty());
assert_eq!(rx.await.unwrap().unwrap_err(), RequestError::UnsupportedCapability);
}
}

View File

@@ -318,6 +318,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
extra_protocols,
handshake,
eth_max_message_size,
network_mode.is_stake(),
);
let state = NetworkState::new(

View File

@@ -577,6 +577,9 @@ pub struct AnnouncedTxTypesMetrics {
/// Histogram for tracking frequency of EIP-7702 transaction type
pub(crate) eip7702: Histogram,
/// Histogram for tracking frequency of unknown/other transaction types
pub(crate) other: Histogram,
}
/// Counts the number of transactions by their type in a block or collection.
@@ -599,6 +602,9 @@ pub struct TxTypesCounter {
/// Count of transactions conforming to EIP-7702 (Restricted Storage Windows).
pub(crate) eip7702: usize,
/// Count of unknown/other transaction types not matching any known EIP.
pub(crate) other: usize,
}
impl TxTypesCounter {
@@ -621,6 +627,10 @@ impl TxTypesCounter {
}
}
}
pub(crate) const fn increase_other(&mut self) {
self.other += 1;
}
}
impl AnnouncedTxTypesMetrics {
@@ -632,5 +642,6 @@ impl AnnouncedTxTypesMetrics {
self.eip1559.record(tx_types_counter.eip1559 as f64);
self.eip4844.record(tx_types_counter.eip4844 as f64);
self.eip7702.record(tx_types_counter.eip7702 as f64);
self.other.record(tx_types_counter.other as f64);
}
}

View File

@@ -79,6 +79,9 @@ const TIMEOUT_SCALING: u32 = 3;
/// before reading any more messages from the remote peer, throttling the peer.
const MAX_QUEUED_OUTGOING_RESPONSES: usize = 4;
/// Minimum capacity to retain for buffered incoming requests from the remote peer.
const MIN_RECEIVED_REQUESTS_CAPACITY: usize = 1;
/// Soft limit for the total number of buffered outgoing broadcast items (e.g. transaction hashes).
///
/// Many small broadcast messages carrying a single tx hash each are equivalent in cost to one
@@ -204,8 +207,8 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
/// Shrinks the capacity of the internal buffers.
pub fn shrink_to_fit(&mut self) {
self.received_requests_from_remote.shrink_to_fit();
self.queued_outgoing.shrink_to_fit();
self.received_requests_from_remote.shrink_to(MIN_RECEIVED_REQUESTS_CAPACITY);
self.queued_outgoing.shrink_to(MAX_QUEUED_OUTGOING_RESPONSES);
}
/// Returns how many responses we've currently queued up.
@@ -1090,8 +1093,8 @@ impl<N: NetworkPrimitives> QueuedOutgoingMessages<N> {
self.count.increment(1);
}
pub(crate) fn shrink_to_fit(&mut self) {
self.messages.shrink_to_fit();
pub(crate) fn shrink_to(&mut self, min_capacity: usize) {
self.messages.shrink_to(min_capacity);
}
}

View File

@@ -93,6 +93,15 @@ impl<N: NetworkPrimitives> EthRlpxConnection<N> {
Self::Satellite(conn) => conn.primary_mut().start_send_raw(msg),
}
}
/// Sets whether to reject block announcement messages (`NewBlock`, `NewBlockHashes`) before
/// RLP decoding to avoid memory amplification from deserializing blocks that will be discarded.
pub fn set_reject_block_announcements(&mut self, reject: bool) {
match self {
Self::EthOnly(conn) => conn.set_reject_block_announcements(reject),
Self::Satellite(conn) => conn.primary_mut().set_reject_block_announcements(reject),
}
}
}
impl<N: NetworkPrimitives> From<EthPeerConnection<N>> for EthRlpxConnection<N> {

View File

@@ -123,6 +123,9 @@ pub struct SessionManager<N: NetworkPrimitives> {
/// Shared local range information that gets propagated to active sessions.
/// This represents the range of blocks that this node can serve to other peers.
local_range_info: BlockRangeInfo,
/// When true, block announcement messages (`NewBlock`, `NewBlockHashes`) are rejected before
/// RLP decoding on new sessions to avoid memory amplification.
reject_block_announcements: bool,
}
// === impl SessionManager ===
@@ -140,6 +143,7 @@ impl<N: NetworkPrimitives> SessionManager<N> {
extra_protocols: RlpxSubProtocols,
handshake: Arc<dyn EthRlpxHandshake>,
eth_max_message_size: usize,
reject_block_announcements: bool,
) -> Self {
let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(config.session_event_buffer);
let (active_session_tx, active_session_rx) = mpsc::channel(config.session_event_buffer);
@@ -176,6 +180,7 @@ impl<N: NetworkPrimitives> SessionManager<N> {
handshake,
eth_max_message_size,
local_range_info,
reject_block_announcements,
}
}
@@ -496,7 +501,7 @@ impl<N: NetworkPrimitives> SessionManager<N> {
local_addr,
peer_id,
capabilities,
conn,
mut conn,
status,
direction,
client_id,
@@ -563,6 +568,10 @@ impl<N: NetworkPrimitives> SessionManager<N> {
BlockRangeInfo::new(update.earliest, update.latest, update.latest_hash)
});
if self.reject_block_announcements {
conn.set_reject_block_announcements(true);
}
let session = ActiveSession {
next_id: 0,
remote_peer_id: peer_id,

View File

@@ -27,7 +27,8 @@ use reth_network_api::{
};
use reth_network_peers::PeerId;
use reth_storage_api::{
noop::NoopProvider, BlockReader, BlockReaderIdExt, HeaderProvider, StateProviderFactory,
noop::NoopProvider, BalProvider, BlockReader, BlockReaderIdExt, HeaderProvider,
StateProviderFactory,
};
use reth_tasks::Runtime;
use reth_tokio_util::EventStream;
@@ -247,6 +248,7 @@ where
Receipt = reth_ethereum_primitives::Receipt,
Header = alloy_consensus::Header,
> + HeaderProvider
+ BalProvider
+ Clone
+ Unpin
+ 'static,
@@ -319,6 +321,7 @@ where
Receipt = reth_ethereum_primitives::Receipt,
Header = alloy_consensus::Header,
> + HeaderProvider
+ BalProvider
+ Unpin
+ 'static,
Pool: TransactionPool<
@@ -462,7 +465,10 @@ where
}
/// Set a new request handler that's connected to the peer's network
pub fn install_request_handler(&mut self) {
pub fn install_request_handler(&mut self)
where
C: BalProvider,
{
let (tx, rx) = channel(ETH_REQUEST_CHANNEL_CAPACITY);
self.network.set_eth_request_handler(tx);
let peers = self.network.peers_handle();
@@ -573,6 +579,7 @@ where
Receipt = reth_ethereum_primitives::Receipt,
Header = alloy_consensus::Header,
> + HeaderProvider
+ BalProvider
+ Unpin
+ 'static,
Pool: TransactionPool<

View File

@@ -700,11 +700,11 @@ impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
}
};
if is_eth68_message &&
let Some((actual_ty_byte, _)) = *metadata_ref_mut &&
let Ok(parsed_tx_type) = TxType::try_from(actual_ty_byte)
{
tx_types_counter.increase_by_tx_type(parsed_tx_type);
if is_eth68_message && let Some((actual_ty_byte, _)) = *metadata_ref_mut {
match TxType::try_from(actual_ty_byte) {
Ok(parsed_tx_type) => tx_types_counter.increase_by_tx_type(parsed_tx_type),
Err(_) => tx_types_counter.increase_other(),
}
}
let decision = self

View File

@@ -2,23 +2,29 @@
//! Tests for eth related requests
use alloy_consensus::Header;
use alloy_primitives::{Bytes, B256};
use rand::Rng;
use reth_eth_wire::{EthVersion, HeadersDirection};
use reth_eth_wire::{BlockAccessLists, EthVersion, GetBlockAccessLists, HeadersDirection};
use reth_ethereum_primitives::Block;
use reth_network::{
test_utils::{NetworkEventStream, PeerConfig, Testnet},
eth_requests::SOFT_RESPONSE_LIMIT,
test_utils::{NetworkEventStream, PeerConfig, Testnet, TestnetHandle},
BlockDownloaderProvider, NetworkEventListenerProvider,
};
use reth_network_api::{NetworkInfo, Peers};
use reth_network_p2p::{
bodies::client::BodiesClient,
error::RequestError,
headers::client::{HeadersClient, HeadersRequest},
BalRequirement, BlockAccessListsClient,
};
use reth_provider::test_utils::MockEthProvider;
use reth_provider::{test_utils::MockEthProvider, BalStoreHandle, InMemoryBalStore};
use reth_transaction_pool::test_utils::{TestPool, TransactionGenerator};
use std::sync::Arc;
use tokio::sync::oneshot;
type BalTestnetHandle = TestnetHandle<Arc<MockEthProvider>, TestPool>;
#[tokio::test(flavor = "multi_thread")]
async fn test_get_body() {
reth_tracing::init_test_tracing();
@@ -526,3 +532,178 @@ async fn test_eth69_get_receipts() {
assert_eq!(receipts_response.0[0][1].cumulative_gas_used, 42000);
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_eth71_get_block_access_lists() {
reth_tracing::init_test_tracing();
let (net, bal_store) = spawn_eth71_bal_testnet().await;
let hash0 = B256::random();
let hash1 = B256::random();
let hash2 = B256::random();
let bal0 = Bytes::from_static(&[0xc1, 0x01]);
let bal2 = Bytes::from_static(&[0xc1, 0x02]);
bal_store.insert(hash0, 1, bal0.clone()).unwrap();
bal_store.insert(hash2, 3, bal2.clone()).unwrap();
let response = request_block_access_lists(&net, vec![hash0, hash1, hash2]).await;
assert_eq!(
response,
BlockAccessLists(vec![bal0, Bytes::from_static(&[alloy_rlp::EMPTY_LIST_CODE]), bal2,])
);
}
// Ensures BAL responses stop at the soft response limit while keeping the item that crosses it.
#[tokio::test(flavor = "multi_thread")]
async fn test_eth71_get_block_access_lists_respects_response_soft_limit() {
reth_tracing::init_test_tracing();
let (net, bal_store) = spawn_eth71_bal_testnet().await;
let hash0 = B256::random();
let hash1 = B256::random();
let hash2 = B256::random();
let bal0 = raw_bal_with_len(2);
let bal1 = raw_bal_with_len(SOFT_RESPONSE_LIMIT);
let bal2 = raw_bal_with_len(2);
assert!(bal0.len() + bal1.len() > SOFT_RESPONSE_LIMIT);
bal_store.insert(hash0, 1, bal0.clone()).unwrap();
bal_store.insert(hash1, 2, bal1.clone()).unwrap();
bal_store.insert(hash2, 3, bal2).unwrap();
let response = request_block_access_lists(&net, vec![hash0, hash1, hash2]).await;
assert_eq!(response, BlockAccessLists(vec![bal0, bal1]));
}
// Ensures a single BAL larger than the soft limit is still returned.
#[tokio::test(flavor = "multi_thread")]
async fn test_eth71_get_block_access_lists_returns_single_oversized_bal() {
reth_tracing::init_test_tracing();
let (net, bal_store) = spawn_eth71_bal_testnet().await;
let hash0 = B256::random();
let hash1 = B256::random();
let bal0 = raw_bal_with_len(SOFT_RESPONSE_LIMIT + 1);
let bal1 = raw_bal_with_len(2);
bal_store.insert(hash0, 1, bal0.clone()).unwrap();
bal_store.insert(hash1, 2, bal1).unwrap();
let response = request_block_access_lists(&net, vec![hash0, hash1]).await;
assert_eq!(response, BlockAccessLists(vec![bal0]));
}
// Ensures an empty BAL request roundtrips to an empty response.
#[tokio::test(flavor = "multi_thread")]
async fn test_eth71_get_block_access_lists_empty_request() {
reth_tracing::init_test_tracing();
let (net, _) = spawn_eth71_bal_testnet().await;
let response = request_block_access_lists(&net, Vec::new()).await;
assert_eq!(response, BlockAccessLists(Vec::new()));
}
// Ensures the fetch client can request BALs through an eth/71 peer.
#[tokio::test(flavor = "multi_thread")]
async fn test_eth71_fetch_client_get_block_access_lists() {
reth_tracing::init_test_tracing();
let (net, bal_store) = spawn_eth71_bal_testnet().await;
let hash0 = B256::random();
let hash1 = B256::random();
let bal0 = Bytes::from_static(&[0xc1, 0x01]);
bal_store.insert(hash0, 1, bal0.clone()).unwrap();
let fetch = net.peers()[0].network().fetch_client().await.unwrap();
let response = fetch.get_block_access_lists(vec![hash0, hash1]).await.unwrap().into_data();
assert_eq!(
response,
BlockAccessLists(vec![bal0, Bytes::from_static(&[alloy_rlp::EMPTY_LIST_CODE])])
);
}
// Ensures fetch client BAL requests are rejected when no eth/71 peer is available.
#[tokio::test(flavor = "multi_thread")]
async fn test_eth70_fetch_client_rejects_optional_block_access_lists_request() {
reth_tracing::init_test_tracing();
let (net, _) = spawn_bal_testnet([EthVersion::Eth70, EthVersion::Eth70]).await;
let fetch = net.peers()[0].network().fetch_client().await.unwrap();
let err = fetch
.get_block_access_lists_with_requirement(vec![B256::random()], BalRequirement::Optional)
.await
.unwrap_err();
assert_eq!(err, RequestError::UnsupportedCapability);
}
async fn spawn_eth71_bal_testnet() -> (BalTestnetHandle, BalStoreHandle) {
spawn_bal_testnet([EthVersion::Eth71, EthVersion::Eth71]).await
}
// Spawns a BAL testnet with one peer per requested eth protocol version.
async fn spawn_bal_testnet(
versions: impl IntoIterator<Item = EthVersion>,
) -> (BalTestnetHandle, BalStoreHandle) {
let mut mock_provider = MockEthProvider::default();
let bal_store = BalStoreHandle::new(InMemoryBalStore::default());
mock_provider.bal_store = bal_store.clone();
let mock_provider = Arc::new(mock_provider);
let mut net: Testnet<Arc<MockEthProvider>, TestPool> = Testnet::default();
for version in versions {
let peer = PeerConfig::with_protocols(mock_provider.clone(), Some(version.into()));
net.add_peer_with_config(peer).await.unwrap();
}
net.for_each_mut(|peer| peer.install_request_handler());
let net = net.spawn();
net.connect_peers().await;
(net, bal_store)
}
// Sends a GetBlockAccessLists request from peer 0 to peer 1.
async fn request_block_access_lists(net: &BalTestnetHandle, hashes: Vec<B256>) -> BlockAccessLists {
let requester = &net.peers()[0];
let responder = &net.peers()[1];
let (tx, rx) = oneshot::channel();
requester.network().send_request(
*responder.peer_id(),
reth_network::PeerRequest::GetBlockAccessLists {
request: GetBlockAccessLists(hashes),
response: tx,
},
);
rx.await.unwrap().unwrap()
}
// Builds a complete raw RLP list item with the requested encoded byte length.
fn raw_bal_with_len(len: usize) -> Bytes {
assert!(len > 0);
let mut payload_length = len - 1;
loop {
let header_length = alloy_rlp::Header { list: true, payload_length }.length();
let next_payload_length = len.checked_sub(header_length).unwrap();
if next_payload_length == payload_length {
break
}
payload_length = next_payload_length;
}
let mut out = Vec::with_capacity(len);
alloy_rlp::Header { list: true, payload_length }.encode(&mut out);
out.resize(len, alloy_rlp::EMPTY_LIST_CODE);
Bytes::from(out)
}

View File

@@ -4,6 +4,17 @@ use auto_impl::auto_impl;
use futures::Future;
use reth_eth_wire_types::BlockAccessLists;
/// Controls whether a BAL request must wait for a capable peer or may complete early when none are
/// available.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum BalRequirement {
/// Keep waiting until an eth/71-capable peer is available.
#[default]
Mandatory,
/// Return early if no connected peer can serve BALs.
Optional,
}
/// A client capable of downloading block access lists.
#[auto_impl(&, Arc, Box)]
pub trait BlockAccessListsClient: DownloadClient {
@@ -12,7 +23,24 @@ pub trait BlockAccessListsClient: DownloadClient {
/// Fetches the block access lists for given hashes.
fn get_block_access_lists(&self, hashes: Vec<B256>) -> Self::Output {
self.get_block_access_lists_with_priority(hashes, Priority::Normal)
self.get_block_access_lists_with_priority_and_requirement(
hashes,
Priority::Normal,
BalRequirement::Mandatory,
)
}
/// Fetches the block access lists for given hashes with the requested BAL availability policy.
fn get_block_access_lists_with_requirement(
&self,
hashes: Vec<B256>,
requirement: BalRequirement,
) -> Self::Output {
self.get_block_access_lists_with_priority_and_requirement(
hashes,
Priority::Normal,
requirement,
)
}
/// Fetches the block access lists for given hashes with priority
@@ -20,5 +48,19 @@ pub trait BlockAccessListsClient: DownloadClient {
&self,
hashes: Vec<B256>,
priority: Priority,
) -> Self::Output {
self.get_block_access_lists_with_priority_and_requirement(
hashes,
priority,
BalRequirement::Mandatory,
)
}
/// Fetches the block access lists for given hashes with priority and BAL availability policy.
fn get_block_access_lists_with_priority_and_requirement(
&self,
hashes: Vec<B256>,
priority: Priority,
requirement: BalRequirement,
) -> Self::Output;
}

View File

@@ -57,8 +57,8 @@ impl<H: BlockHeader> EthResponseValidator for RequestResult<Vec<H>> {
/// [`RequestError::ConnectionDropped`] should be ignored here because this is already handled
/// when the dropped connection is handled.
///
/// [`RequestError::UnsupportedCapability`] is not used yet because we only support active
/// session for eth protocol.
/// [`RequestError::UnsupportedCapability`] is also used for locally rejected optional requests,
/// which should not affect peer reputation.
fn reputation_change_err(&self) -> Option<ReputationChangeKind> {
if let Err(err) = self {
match err {

View File

@@ -1110,10 +1110,11 @@ mod tests {
impl BlockAccessListsClient for FullBlockWithAccessListsClient {
type Output = futures::future::Ready<PeerRequestResult<BlockAccessLists>>;
fn get_block_access_lists_with_priority(
fn get_block_access_lists_with_priority_and_requirement(
&self,
hashes: Vec<B256>,
_priority: Priority,
_requirement: crate::block_access_lists::client::BalRequirement,
) -> Self::Output {
self.access_list_requests.fetch_add(1, Ordering::SeqCst);

View File

@@ -54,7 +54,7 @@ pub mod snap;
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;
pub use block_access_lists::client::BlockAccessListsClient;
pub use block_access_lists::client::{BalRequirement, BlockAccessListsClient};
pub use bodies::client::BodiesClient;
pub use headers::client::HeadersClient;
pub use receipts::client::ReceiptsClient;

View File

@@ -167,8 +167,9 @@ impl LaunchContext {
info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
// Update the config with the command line arguments
toml_config.peers.trusted_nodes_only = config.network.trusted_only;
// Update the config with the command line arguments. Only override when the CLI flag is
// set, so the TOML value is preserved when the flag is not passed.
toml_config.peers.trusted_nodes_only |= config.network.trusted_only;
// Merge static file CLI arguments with config file, giving priority to CLI
toml_config.static_files =

View File

@@ -205,10 +205,12 @@ impl EngineNodeLauncher {
ctx.blockchain_db().clone(),
ctx.components().evm_config().clone(),
|| async {
// Create a separate cache for reorg validator (not shared with main engine)
let reorg_cache = ChangesetCache::new();
validator_builder
.build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), reorg_cache)
.build_tree_validator(
&add_ons_ctx,
engine_tree_config.clone(),
changeset_cache.clone(),
)
.await
},
node_config.debug.reorg_frequency,

View File

@@ -46,6 +46,15 @@ pub struct BenchmarkArgs {
)]
pub engine_rpc_url: String,
/// The RPC url to use for non-authenticated node RPC requests.
#[arg(
long,
value_name = "LOCAL_RPC_URL",
verbatim_doc_comment,
default_value = "http://localhost:8545"
)]
pub local_rpc_url: String,
/// The `WebSocket` RPC URL to use for persistence subscriptions.
///
/// If not provided, will attempt to derive from engine-rpc-url by:
@@ -241,6 +250,7 @@ mod tests {
fn test_parse_benchmark_args() {
let default_args = BenchmarkArgs {
engine_rpc_url: "http://localhost:8551".to_string(),
local_rpc_url: "http://localhost:8545".to_string(),
..Default::default()
};
let args = CommandParser::<BenchmarkArgs>::parse_from(["reth-bench"]).args;

View File

@@ -60,12 +60,13 @@ pub struct DatabaseArgs {
value_parser = value_parser!(SyncMode),
)]
pub sync_mode: Option<SyncMode>,
/// Hidden benchmark/debug knob that adds an artificial delay to each MDBX read operation.
/// `RocksDB` block cache size (e.g., 512MB, 4GB).
///
/// This is primarily intended for storage-latency simulation when kernel/device-level
/// shaping cannot represent sub-millisecond delays.
#[arg(long = "db.read-delay", value_parser = humantime::parse_duration, hide = true)]
pub read_delay: Option<Duration>,
/// Controls the size of the in-memory LRU cache for decompressed `RocksDB` blocks.
/// A larger cache reduces repeated decompression of hot blocks, improving read
/// performance for history lookups.
#[arg(long = "db.rocksdb-block-cache-size", value_parser = parse_byte_size)]
pub rocksdb_block_cache_size: Option<usize>,
}
impl DatabaseArgs {
@@ -95,7 +96,6 @@ impl DatabaseArgs {
.with_growth_step(self.growth_step)
.with_max_readers(self.max_readers)
.with_sync_mode(self.sync_mode)
.with_read_delay(self.read_delay)
}
}
@@ -442,12 +442,4 @@ mod tests {
CommandParser::<DatabaseArgs>::try_parse_from(["reth", "--db.sync-mode", "ultra-fast"]);
assert!(result.is_err());
}
#[test]
fn test_command_parser_with_read_delay() {
let cmd =
CommandParser::<DatabaseArgs>::try_parse_from(["reth", "--db.read-delay", "500us"])
.unwrap();
assert_eq!(cmd.args.read_delay, Some(Duration::from_micros(500)));
}
}

View File

@@ -4,8 +4,9 @@ use clap::{builder::Resettable, Args};
use eyre::ensure;
use reth_cli_util::{parse_duration_from_secs_or_ms, parsers::format_duration_as_secs_or_ms};
use reth_engine_primitives::{
TreeConfig, DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE, DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD,
DEFAULT_SPARSE_TRIE_MAX_HOT_ACCOUNTS, DEFAULT_SPARSE_TRIE_MAX_HOT_SLOTS,
TreeConfig, DEFAULT_INVALID_HEADER_HIT_EVICTION_THRESHOLD, DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE,
DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD, DEFAULT_SPARSE_TRIE_MAX_HOT_ACCOUNTS,
DEFAULT_SPARSE_TRIE_MAX_HOT_SLOTS,
};
use std::{sync::OnceLock, time::Duration};
@@ -25,6 +26,7 @@ pub struct DefaultEngineValues {
persistence_threshold: u64,
persistence_backpressure_threshold: u64,
memory_block_buffer_target: u64,
invalid_header_hit_eviction_threshold: u8,
legacy_state_root_task_enabled: bool,
state_cache_disabled: bool,
prewarming_disabled: bool,
@@ -83,6 +85,12 @@ impl DefaultEngineValues {
self
}
/// Set the invalid header cache hit eviction threshold
pub const fn with_invalid_header_hit_eviction_threshold(mut self, v: u8) -> Self {
self.invalid_header_hit_eviction_threshold = v;
self
}
/// Set whether to enable legacy state root task by default
pub const fn with_legacy_state_root_task_enabled(mut self, v: bool) -> Self {
self.legacy_state_root_task_enabled = v;
@@ -255,6 +263,7 @@ impl Default for DefaultEngineValues {
persistence_threshold: DEFAULT_PERSISTENCE_THRESHOLD,
persistence_backpressure_threshold: DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD,
memory_block_buffer_target: DEFAULT_MEMORY_BLOCK_BUFFER_TARGET,
invalid_header_hit_eviction_threshold: DEFAULT_INVALID_HEADER_HIT_EVICTION_THRESHOLD,
legacy_state_root_task_enabled: false,
state_cache_disabled: false,
prewarming_disabled: false,
@@ -309,6 +318,14 @@ pub struct EngineArgs {
#[arg(long = "engine.memory-block-buffer-target", default_value_t = DefaultEngineValues::get_global().memory_block_buffer_target)]
pub memory_block_buffer_target: u64,
/// Configure how many cache hits an invalid header can accumulate before it is evicted and
/// reprocessed.
///
/// Set to `0` to effectively disable the cache because entries are evicted on the first
/// lookup.
#[arg(long = "engine.invalid-header-cache-hit-eviction-threshold", default_value_t = DefaultEngineValues::get_global().invalid_header_hit_eviction_threshold)]
pub invalid_header_hit_eviction_threshold: u8,
/// Enable legacy state root
#[arg(long = "engine.legacy-state-root", default_value_t = DefaultEngineValues::get_global().legacy_state_root_task_enabled)]
pub legacy_state_root_task_enabled: bool,
@@ -530,6 +547,7 @@ impl Default for EngineArgs {
persistence_threshold,
persistence_backpressure_threshold,
memory_block_buffer_target,
invalid_header_hit_eviction_threshold,
legacy_state_root_task_enabled,
state_cache_disabled,
prewarming_disabled,
@@ -562,6 +580,7 @@ impl Default for EngineArgs {
persistence_threshold,
persistence_backpressure_threshold,
memory_block_buffer_target,
invalid_header_hit_eviction_threshold,
legacy_state_root_task_enabled,
state_root_task_compare_updates,
caching_and_prewarming_enabled: true,
@@ -620,6 +639,7 @@ impl EngineArgs {
.with_persistence_threshold(self.persistence_threshold)
.with_persistence_backpressure_threshold(self.persistence_backpressure_threshold)
.with_memory_block_buffer_target(self.memory_block_buffer_target)
.with_invalid_header_hit_eviction_threshold(self.invalid_header_hit_eviction_threshold)
.with_legacy_state_root(self.legacy_state_root_task_enabled)
.without_state_cache(self.state_cache_disabled)
.without_prewarming(self.prewarming_disabled)
@@ -682,6 +702,7 @@ mod tests {
persistence_threshold: 100,
persistence_backpressure_threshold: 101,
memory_block_buffer_target: 50,
invalid_header_hit_eviction_threshold: 7,
legacy_state_root_task_enabled: true,
caching_and_prewarming_enabled: true,
state_cache_disabled: true,
@@ -726,6 +747,8 @@ mod tests {
"101",
"--engine.memory-block-buffer-target",
"50",
"--engine.invalid-header-cache-hit-eviction-threshold",
"7",
"--engine.legacy-state-root",
"--engine.disable-state-cache",
"--engine.disable-prewarming",
@@ -808,6 +831,28 @@ mod tests {
assert_eq!(args.slow_block_threshold, Some(Duration::from_millis(500)));
}
#[test]
fn test_parse_invalid_header_hit_eviction_threshold() {
let args = CommandParser::<EngineArgs>::parse_from(["reth"]).args;
assert_eq!(
args.invalid_header_hit_eviction_threshold,
DEFAULT_INVALID_HEADER_HIT_EVICTION_THRESHOLD
);
assert_eq!(
args.tree_config().invalid_header_hit_eviction_threshold(),
DEFAULT_INVALID_HEADER_HIT_EVICTION_THRESHOLD
);
let args = CommandParser::<EngineArgs>::parse_from([
"reth",
"--engine.invalid-header-cache-hit-eviction-threshold",
"0",
])
.args;
assert_eq!(args.invalid_header_hit_eviction_threshold, 0);
assert_eq!(args.tree_config().invalid_header_hit_eviction_threshold(), 0);
}
#[test]
fn test_parse_share_sparse_trie_flag() {
let args = CommandParser::<EngineArgs>::parse_from(["reth"]).args;

View File

@@ -719,9 +719,16 @@ pub struct DiscoveryArgs {
pub disable_discv4_discovery: bool,
/// Enable Discv5 discovery.
#[arg(long, conflicts_with = "disable_discovery")]
///
/// Discv5 is now enabled by default, so this flag is a no-op and will be removed in a future
/// release.
#[arg(long, conflicts_with = "disable_discovery", hide = true)]
pub enable_discv5_discovery: bool,
/// Disable Discv5 discovery.
#[arg(long, conflicts_with = "disable_discovery")]
pub disable_discv5_discovery: bool,
/// Disable Nat discovery.
#[arg(long, conflicts_with = "disable_discovery")]
pub disable_nat: bool,
@@ -852,21 +859,23 @@ impl DiscoveryArgs {
.bootstrap_lookup_countdown(*discv5_bootstrap_lookup_countdown)
}
/// Returns true if discv5 discovery should be configured
/// Returns true if discv5 discovery should be configured.
///
/// Discv5 is enabled by default and can be disabled with `--disable-discv5-discovery`.
const fn should_enable_discv5(&self) -> bool {
if self.disable_discovery {
if self.disable_discovery || self.disable_discv5_discovery {
return false;
}
self.enable_discv5_discovery ||
self.discv5_addr.is_some() ||
self.discv5_addr_ipv6.is_some()
true
}
/// Set the discovery port to zero, to allow the OS to assign a random unused port when
/// discovery binds to the socket.
/// Set the discovery ports to zero, to allow the OS to assign random unused ports when
/// discovery binds to the sockets.
pub const fn with_unused_discovery_port(mut self) -> Self {
self.port = 0;
self.discv5_port = 0;
self.discv5_port_ipv6 = 0;
self
}
@@ -896,6 +905,7 @@ impl Default for DiscoveryArgs {
disable_dns_discovery: false,
disable_discv4_discovery: false,
enable_discv5_discovery: false,
disable_discv5_discovery: false,
disable_nat: false,
addr: DEFAULT_DISCOVERY_ADDR,
port: DEFAULT_DISCOVERY_PORT,

View File

@@ -115,6 +115,11 @@ pub trait EthState: LoadState + SpawnBlocking {
block_id: Option<BlockId>,
) -> impl Future<Output = Result<HashMap<Address, Vec<B256>>, Self::Error>> + Send {
async move {
if requests.is_empty() {
return Err(Self::Error::from_eth_err(EthApiError::InvalidParams(
"empty request".to_string(),
)));
}
let total_slots: usize = requests.values().map(|slots| slots.len()).sum();
if total_slots > DEFAULT_MAX_STORAGE_VALUES_SLOTS {
return Err(Self::Error::from_eth_err(EthApiError::InvalidParams(

View File

@@ -283,6 +283,7 @@ pub trait Trace: LoadState<Error: FromEvmError<Self::Evm>> + Call {
let block_hash = block.hash();
let block_number = evm_env.block_env.number().saturating_to();
let block_timestamp = evm_env.block_env.timestamp().saturating_to();
let base_fee = evm_env.block_env.basefee();
this.apply_pre_execution_changes(&block, &mut db)?;
@@ -309,8 +310,8 @@ pub trait Trace: LoadState<Error: FromEvmError<Self::Evm>> + Call {
index: Some(idx),
block_hash: Some(block_hash),
block_number: Some(block_number),
block_timestamp: Some(block_timestamp),
base_fee: Some(base_fee),
..Default::default()
};
idx += 1;

View File

@@ -325,15 +325,16 @@ pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
if let Some(block) = self.recovered_block(block_id).await? {
let block_hash = block.hash();
let block_number = block.number();
let block_timestamp = block.timestamp();
let base_fee_per_gas = block.base_fee_per_gas();
if let Some((signer, tx)) = block.transactions_with_sender().nth(index) {
let tx_info = TransactionInfo {
hash: Some(*tx.tx_hash()),
block_hash: Some(block_hash),
block_number: Some(block_number),
block_timestamp: Some(block_timestamp),
base_fee: base_fee_per_gas,
index: Some(index as u64),
..Default::default()
};
return Ok(Some(
@@ -395,6 +396,7 @@ pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
.and_then(|block| {
let block_hash = block.hash();
let block_number = block.number();
let block_timestamp = block.timestamp();
let base_fee_per_gas = block.base_fee_per_gas();
block
@@ -406,9 +408,9 @@ pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
hash: Some(*tx.tx_hash()),
block_hash: Some(block_hash),
block_number: Some(block_number),
block_timestamp: Some(block_timestamp),
base_fee: base_fee_per_gas,
index: Some(index as u64),
..Default::default()
};
Ok(self.converter().fill(tx.clone().with_signer(*signer), tx_info)?)
})
@@ -681,6 +683,7 @@ pub trait LoadTransaction: SpawnBlocking + FullEthApiTypes + RpcNodeCoreExt {
index: meta.index,
block_hash: meta.block_hash,
block_number: meta.block_number,
block_timestamp: meta.timestamp,
base_fee: meta.base_fee,
}));
}

View File

@@ -53,6 +53,7 @@ impl<B: Block, R> CachedTransaction<B, R> {
index: self.tx_index as u64,
block_hash: self.block.hash(),
block_number: self.block.number(),
block_timestamp: self.block.timestamp(),
base_fee: self.block.base_fee_per_gas(),
})
}

View File

@@ -344,17 +344,9 @@ where
}
/// Removes transaction index entries for a reorged block.
///
/// Only removes entries that still point to this block, preserving mappings for transactions
/// that were re-mined in a new canonical block.
fn remove_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
let block_hash = block.hash();
for tx in block.body().transactions() {
if let Some((mapped_hash, _)) = self.tx_hash_index.get(tx.tx_hash()) &&
*mapped_hash == block_hash
{
self.tx_hash_index.remove(tx.tx_hash());
}
self.tx_hash_index.remove(tx.tx_hash());
}
}
@@ -421,6 +413,15 @@ where
}
}
fn on_reorg_header(&mut self, block_hash: B256, res: ProviderResult<Provider::Header>) {
if let Some(queued) = self.headers_cache.remove(&block_hash) {
// send the response to queued senders
for tx in queued {
let _ = tx.send(res.clone());
}
}
}
/// Shrinks the queues but leaves some space for the next requests
fn shrink_queues(&mut self) {
let min_capacity = 2;
@@ -597,9 +598,12 @@ where
}
CacheAction::RemoveReorgedChain { chain_change } => {
for block in chain_change.blocks {
let block_hash = block.hash();
let header = block.clone_header();
// Remove transaction index entries for reorged blocks
this.remove_block_transactions(&block);
this.on_reorg_block(block.hash(), Ok(Some(block)));
this.on_reorg_block(block_hash, Ok(Some(block)));
this.on_reorg_header(block_hash, Ok(header));
}
for block_receipts in chain_change.receipts {
@@ -825,3 +829,89 @@ pub async fn cache_new_blocks_task<St, N: NodePrimitives>(
eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change });
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_consensus::Header;
use alloy_primitives::{Address, Signature};
use reth_ethereum_primitives::{
Block, BlockBody, EthPrimitives, Transaction, TransactionSigned,
};
use reth_primitives_traits::RecoveredBlock;
use reth_storage_api::noop::NoopProvider;
fn test_service() -> EthStateCacheService<NoopProvider, Runtime> {
let (_cache, service) = EthStateCache::<EthPrimitives>::create(
NoopProvider::default(),
Runtime::test(),
4,
4,
4,
1,
16,
);
service
}
fn test_block() -> RecoveredBlock<Block> {
RecoveredBlock::new_unhashed(
Block {
header: Header { number: 1, ..Default::default() },
body: BlockBody {
transactions: vec![TransactionSigned::new_unhashed(
Transaction::Legacy(Default::default()),
Signature::test_signature(),
)],
..Default::default()
},
},
vec![Address::ZERO],
)
}
#[test]
fn reorg_evicts_cached_headers() {
let mut service = test_service();
let block_hash = B256::repeat_byte(0x11);
assert!(service
.headers_cache
.insert(block_hash, Header { number: 42, ..Default::default() }));
assert!(service.headers_cache.get(&block_hash).is_some());
service.on_reorg_header(block_hash, Ok(Header { number: 7, ..Default::default() }));
assert!(service.headers_cache.get(&block_hash).is_none());
}
#[test]
fn reorg_forwards_header_to_queued_requests() {
let mut service = test_service();
let block_hash = B256::repeat_byte(0x22);
let (response_tx, mut response_rx) = oneshot::channel();
let header = Header { number: 7, ..Default::default() };
assert!(service.headers_cache.queue(block_hash, response_tx));
service.on_reorg_header(block_hash, Ok(header));
let header =
response_rx.try_recv().expect("queued header response").expect("header result");
assert_eq!(header.number, 7);
}
#[test]
fn reorg_removes_tx_hash_index_entries_unconditionally() {
let mut service = test_service();
let block = test_block();
let tx_hash = *block.body().transactions().next().expect("test transaction").tx_hash();
service.tx_hash_index.insert(tx_hash, (B256::repeat_byte(0x33), 0));
service.remove_block_transactions(&block);
assert!(service.tx_hash_index.get(&tx_hash).is_none());
}
}

View File

@@ -169,7 +169,10 @@ pub fn get_filter_block_range(
// we cannot query blocks that don't exist yet
if to_block_number > info.best_number {
return Err(FilterBlockRangeError::BlockRangeExceedsHead);
return Err(FilterBlockRangeError::BlockRangeExceedsHead {
requested: to_block_number,
head: info.best_number,
});
}
Ok((from_block_number, to_block_number))
@@ -184,8 +187,13 @@ pub enum FilterBlockRangeError {
#[error("invalid block range params")]
InvalidBlockRange,
/// Block range extends beyond current head
#[error("block range extends beyond current head block")]
BlockRangeExceedsHead,
#[error("block range extends beyond current head block: requested {requested}, head {head}")]
BlockRangeExceedsHead {
/// The requested `toBlock` number
requested: u64,
/// The current head block number
head: u64,
},
}
#[cfg(test)]
@@ -227,7 +235,10 @@ mod tests {
let to = 15000002u64;
let info = ChainInfo { best_number: 15000000, ..Default::default() };
let err = get_filter_block_range(Some(from), Some(to), info.best_number, info).unwrap_err();
assert_eq!(err, FilterBlockRangeError::BlockRangeExceedsHead);
assert_eq!(
err,
FilterBlockRangeError::BlockRangeExceedsHead { requested: to, head: info.best_number }
);
}
#[test]
@@ -263,7 +274,10 @@ mod tests {
let to = 200;
let info = ChainInfo { best_number: 150, ..Default::default() };
let err = get_filter_block_range(Some(from), Some(to), 0, info).unwrap_err();
assert_eq!(err, FilterBlockRangeError::BlockRangeExceedsHead);
assert_eq!(
err,
FilterBlockRangeError::BlockRangeExceedsHead { requested: to, head: info.best_number }
);
}
#[test]

View File

@@ -25,6 +25,8 @@ pub enum TransactionSource<T = TransactionSigned> {
block_hash: B256,
/// Number of the block.
block_number: u64,
/// Timestamp of the block.
block_timestamp: u64,
/// base fee of the block.
base_fee: Option<u64>,
},
@@ -48,14 +50,21 @@ impl<T: SignedTransaction> TransactionSource<T> {
{
match self {
Self::Pool(tx) => resp_builder.fill_pending(tx),
Self::Block { transaction, index, block_hash, block_number, base_fee } => {
Self::Block {
transaction,
index,
block_hash,
block_number,
block_timestamp,
base_fee,
} => {
let tx_info = TransactionInfo {
hash: Some(transaction.trie_hash()),
index: Some(index),
block_hash: Some(block_hash),
block_number: Some(block_number),
block_timestamp: Some(block_timestamp),
base_fee,
..Default::default()
};
resp_builder.fill(transaction, tx_info)
@@ -70,7 +79,14 @@ impl<T: SignedTransaction> TransactionSource<T> {
let hash = tx.trie_hash();
(tx, TransactionInfo { hash: Some(hash), ..Default::default() })
}
Self::Block { transaction, index, block_hash, block_number, base_fee } => {
Self::Block {
transaction,
index,
block_hash,
block_number,
block_timestamp,
base_fee,
} => {
let hash = transaction.trie_hash();
(
transaction,
@@ -79,8 +95,8 @@ impl<T: SignedTransaction> TransactionSource<T> {
index: Some(index),
block_hash: Some(block_hash),
block_number: Some(block_number),
block_timestamp: Some(block_timestamp),
base_fee,
..Default::default()
},
)
}

View File

@@ -564,7 +564,10 @@ where
if let Some(t) = to &&
t > info.best_number
{
return Err(EthFilterError::BlockRangeExceedsHead);
return Err(EthFilterError::BlockRangeExceedsHead {
requested: t,
head: info.best_number,
});
}
if let Some(f) = from &&
@@ -942,8 +945,13 @@ pub enum EthFilterError {
#[error("invalid block range params")]
InvalidBlockRangeParams,
/// Block range extends beyond current head.
#[error("block range extends beyond current head block")]
BlockRangeExceedsHead,
#[error("block range extends beyond current head block: requested {requested}, head {head}")]
BlockRangeExceedsHead {
/// The requested `toBlock` number
requested: u64,
/// The current head block number
head: u64,
},
/// Query scope is too broad.
#[error("query exceeds max block range {0}")]
QueryExceedsMaxBlocks(u64),
@@ -979,7 +987,7 @@ impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
err @ (EthFilterError::InvalidBlockRangeParams |
EthFilterError::QueryExceedsMaxBlocks(_) |
EthFilterError::QueryExceedsMaxResults { .. } |
EthFilterError::BlockRangeExceedsHead) => {
EthFilterError::BlockRangeExceedsHead { .. }) => {
rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
}
}
@@ -996,7 +1004,9 @@ impl From<logs_utils::FilterBlockRangeError> for EthFilterError {
fn from(err: logs_utils::FilterBlockRangeError) -> Self {
match err {
logs_utils::FilterBlockRangeError::InvalidBlockRange => Self::InvalidBlockRangeParams,
logs_utils::FilterBlockRangeError::BlockRangeExceedsHead => Self::BlockRangeExceedsHead,
logs_utils::FilterBlockRangeError::BlockRangeExceedsHead { requested, head } => {
Self::BlockRangeExceedsHead { requested, head }
}
}
}
}

View File

@@ -1020,41 +1020,46 @@ mod tests {
done: true
} if processed == total && total == block.gas_used);
let provider = factory.provider().unwrap();
{
let provider = factory.provider().unwrap();
// check post state
let account1 = address!("0x1000000000000000000000000000000000000000");
let account1_info =
Account { balance: U256::ZERO, nonce: 0x00, bytecode_hash: Some(code_hash) };
let account2 = address!("0x2adc25665018aa1fe0e6bc666dac8fc2697ff9ba");
let account2_info = Account {
balance: U256::from(0x1bc16d674ece94bau128),
nonce: 0x00,
bytecode_hash: None,
};
let account3 = address!("0xa94f5374fce5edbc8e2a8697c15331677e6ebf0b");
let account3_info = Account {
balance: U256::from(0x3635c9adc5de996b46u128),
nonce: 0x01,
bytecode_hash: None,
};
// check post state
let account1 = address!("0x1000000000000000000000000000000000000000");
let account1_info =
Account { balance: U256::ZERO, nonce: 0x00, bytecode_hash: Some(code_hash) };
let account2 = address!("0x2adc25665018aa1fe0e6bc666dac8fc2697ff9ba");
let account2_info = Account {
balance: U256::from(0x1bc16d674ece94bau128),
nonce: 0x00,
bytecode_hash: None,
};
let account3 = address!("0xa94f5374fce5edbc8e2a8697c15331677e6ebf0b");
let account3_info = Account {
balance: U256::from(0x3635c9adc5de996b46u128),
nonce: 0x01,
bytecode_hash: None,
};
// assert accounts
assert!(
matches!(provider.basic_account(&account1), Ok(Some(acc)) if acc == account1_info)
);
assert!(
matches!(provider.basic_account(&account2), Ok(Some(acc)) if acc == account2_info)
);
assert!(
matches!(provider.basic_account(&account3), Ok(Some(acc)) if acc == account3_info)
);
// assert storage
// Get on dupsort would return only first value. This is good enough for this test.
assert!(matches!(
provider.tx_ref().get::<tables::PlainStorageState>(account1),
Ok(Some(entry)) if entry.key == B256::with_last_byte(1) && entry.value == U256::from(2)
));
// assert accounts
assert!(matches!(
provider.basic_account(&account1),
Ok(Some(acc)) if acc == account1_info
));
assert!(matches!(
provider.basic_account(&account2),
Ok(Some(acc)) if acc == account2_info
));
assert!(matches!(
provider.basic_account(&account3),
Ok(Some(acc)) if acc == account3_info
));
// assert storage
// Get on dupsort would return only first value. This is good enough for this test.
assert!(matches!(
provider.tx_ref().get::<tables::PlainStorageState>(account1),
Ok(Some(entry)) if entry.key == B256::with_last_byte(1) && entry.value == U256::from(2)
));
}
let mut provider = factory.database_provider_rw().unwrap();
let mut stage = stage();

View File

@@ -6,7 +6,8 @@
use crate::ChangesetOffset;
use std::{
fs::{File, OpenOptions},
io::{self, Read, Seek, SeekFrom, Write},
io::{self, Write},
os::unix::fs::FileExt,
path::Path,
};
@@ -177,16 +178,14 @@ impl ChangesetOffsetReader {
/// Reads a single changeset offset by block index.
/// Returns None if index is out of bounds.
pub fn get(&mut self, block_index: u64) -> io::Result<Option<ChangesetOffset>> {
pub fn get(&self, block_index: u64) -> io::Result<Option<ChangesetOffset>> {
if block_index >= self.len {
return Ok(None);
}
let byte_pos = block_index * Self::RECORD_SIZE as u64;
self.file.seek(SeekFrom::Start(byte_pos))?;
let mut buf = [0u8; Self::RECORD_SIZE];
self.file.read_exact(&mut buf)?;
self.file.read_exact_at(&mut buf, byte_pos)?;
let offset = u64::from_le_bytes(buf[..8].try_into().unwrap());
let num_changes = u64::from_le_bytes(buf[8..].try_into().unwrap());
@@ -195,7 +194,7 @@ impl ChangesetOffsetReader {
}
/// Reads a range of changeset offsets.
pub fn get_range(&mut self, start: u64, end: u64) -> io::Result<Vec<ChangesetOffset>> {
pub fn get_range(&self, start: u64, end: u64) -> io::Result<Vec<ChangesetOffset>> {
let end = end.min(self.len);
if start >= end {
return Ok(Vec::new());
@@ -203,13 +202,13 @@ impl ChangesetOffsetReader {
let count = (end - start) as usize;
let byte_pos = start * Self::RECORD_SIZE as u64;
self.file.seek(SeekFrom::Start(byte_pos))?;
let mut result = Vec::with_capacity(count);
let mut buf = [0u8; Self::RECORD_SIZE];
for _ in 0..count {
self.file.read_exact(&mut buf)?;
for i in 0..count {
let pos = byte_pos + (i as u64) * Self::RECORD_SIZE as u64;
self.file.read_exact_at(&mut buf, pos)?;
let offset = u64::from_le_bytes(buf[..8].try_into().unwrap());
let num_changes = u64::from_le_bytes(buf[8..].try_into().unwrap());
result.push(ChangesetOffset::new(offset, num_changes));
@@ -251,7 +250,7 @@ mod tests {
// Read
{
let mut reader = ChangesetOffsetReader::new(&path, 3).unwrap();
let reader = ChangesetOffsetReader::new(&path, 3).unwrap();
assert_eq!(reader.len(), 3);
let entry = reader.get(0).unwrap().unwrap();
@@ -284,7 +283,7 @@ mod tests {
writer.truncate(2).unwrap();
assert_eq!(writer.len(), 2);
let mut reader = ChangesetOffsetReader::new(&path, 2).unwrap();
let reader = ChangesetOffsetReader::new(&path, 2).unwrap();
assert_eq!(reader.len(), 2);
assert!(reader.get(2).unwrap().is_none());
}
@@ -317,7 +316,7 @@ mod tests {
assert_eq!(std::fs::metadata(&path).unwrap().len(), 16);
// Verify the complete record is readable
let mut reader = ChangesetOffsetReader::new(&path, 1).unwrap();
let reader = ChangesetOffsetReader::new(&path, 1).unwrap();
assert_eq!(reader.len(), 1);
let entry = reader.get(0).unwrap().unwrap();
assert_eq!(entry.offset(), 100);
@@ -340,7 +339,7 @@ mod tests {
}
// Open with len=2, ignoring the 3rd record
let mut reader = ChangesetOffsetReader::new(&path, 2).unwrap();
let reader = ChangesetOffsetReader::new(&path, 2).unwrap();
assert_eq!(reader.len(), 2);
// First two records should be readable
@@ -397,7 +396,7 @@ mod tests {
// Verify the records are correct
{
let mut reader = ChangesetOffsetReader::new(&path, 3).unwrap();
let reader = ChangesetOffsetReader::new(&path, 3).unwrap();
assert_eq!(reader.len(), 3);
let entry0 = reader.get(0).unwrap().unwrap();

View File

@@ -15,9 +15,9 @@ mod compression;
mod event;
mod segment;
#[cfg(feature = "std")]
#[cfg(all(feature = "std", unix))]
mod changeset_offsets;
#[cfg(feature = "std")]
#[cfg(all(feature = "std", unix))]
pub use changeset_offsets::{ChangesetOffsetReader, ChangesetOffsetWriter};
use alloy_primitives::BlockNumber;

View File

@@ -17,7 +17,7 @@ use roaring::RoaringTreemap;
/// - Direct access: elements can be accessed or queried without needing to decode the entire list.
/// - [`RoaringTreemap`] backing: internally backed by [`RoaringTreemap`], which supports 64-bit
/// integers.
#[derive(Clone, PartialEq, Default, Deref)]
#[derive(Clone, PartialEq, Eq, Default, Deref)]
pub struct IntegerList(pub RoaringTreemap);
impl fmt::Debug for IntegerList {

View File

@@ -42,12 +42,12 @@ rustc-hash = { workspace = true, optional = true, features = ["std"] }
sysinfo = { workspace = true, features = ["system"] }
parking_lot = { workspace = true, optional = true }
[target.'cfg(unix)'.dependencies]
libc.workspace = true
# arbitrary utils
strum = { workspace = true, features = ["derive"], optional = true }
[target.'cfg(unix)'.dependencies]
libc.workspace = true
[dev-dependencies]
# reth libs with arbitrary
reth-primitives-traits = { workspace = true, features = ["reth-codec"] }

View File

@@ -31,8 +31,6 @@ pub struct Cursor<K: TransactionKind, T: Table> {
buf: Vec<u8>,
/// Reference to metric handles in the DB environment. If `None`, metrics are not recorded.
metrics: Option<Arc<DatabaseEnvMetrics>>,
/// Optional artificial delay applied to MDBX read operations.
read_delay: Option<std::time::Duration>,
/// Phantom data to enforce encoding/decoding.
_dbi: PhantomData<T>,
}
@@ -41,9 +39,8 @@ impl<K: TransactionKind, T: Table> Cursor<K, T> {
pub(crate) const fn new_with_metrics(
inner: reth_libmdbx::Cursor<K>,
metrics: Option<Arc<DatabaseEnvMetrics>>,
read_delay: Option<std::time::Duration>,
) -> Self {
Self { inner, buf: Vec::new(), metrics, read_delay, _dbi: PhantomData }
Self { inner, buf: Vec::new(), metrics, _dbi: PhantomData }
}
/// If `self.metrics` is `Some(...)`, record a metric with the provided operation and value
@@ -62,15 +59,6 @@ impl<K: TransactionKind, T: Table> Cursor<K, T> {
f(self)
}
}
fn execute_with_read_delay<R>(
&mut self,
f: impl FnOnce(&mut reth_libmdbx::Cursor<K>) -> R,
) -> R {
let result = f(&mut self.inner);
apply_read_delay(self.read_delay);
result
}
}
/// Decodes a `(key, value)` pair from the database.
@@ -102,43 +90,39 @@ macro_rules! compress_to_buf_or_ref {
impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
fn first(&mut self) -> PairResult<T> {
decode::<T>(self.execute_with_read_delay(|cursor| cursor.first()))
decode::<T>(self.inner.first())
}
fn seek_exact(&mut self, key: <T as Table>::Key) -> PairResult<T> {
decode::<T>(self.execute_with_read_delay(|cursor| cursor.set_key(key.encode().as_ref())))
decode::<T>(self.inner.set_key(key.encode().as_ref()))
}
fn seek(&mut self, key: <T as Table>::Key) -> PairResult<T> {
decode::<T>(self.execute_with_read_delay(|cursor| cursor.set_range(key.encode().as_ref())))
decode::<T>(self.inner.set_range(key.encode().as_ref()))
}
fn next(&mut self) -> PairResult<T> {
decode::<T>(self.execute_with_read_delay(|cursor| cursor.next()))
decode::<T>(self.inner.next())
}
fn prev(&mut self) -> PairResult<T> {
decode::<T>(self.execute_with_read_delay(|cursor| cursor.prev()))
decode::<T>(self.inner.prev())
}
fn last(&mut self) -> PairResult<T> {
decode::<T>(self.execute_with_read_delay(|cursor| cursor.last()))
decode::<T>(self.inner.last())
}
fn current(&mut self) -> PairResult<T> {
decode::<T>(self.execute_with_read_delay(|cursor| cursor.get_current()))
decode::<T>(self.inner.get_current())
}
fn walk(&mut self, start_key: Option<T::Key>) -> Result<Walker<'_, T, Self>, DatabaseError> {
let start =
if let Some(start_key) = start_key {
decode::<T>(self.execute_with_read_delay(|cursor| {
cursor.set_range(start_key.encode().as_ref())
}))
.transpose()
} else {
self.first().transpose()
};
let start = if let Some(start_key) = start_key {
decode::<T>(self.inner.set_range(start_key.encode().as_ref())).transpose()
} else {
self.first().transpose()
};
Ok(Walker::new(self, start))
}
@@ -148,13 +132,11 @@ impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
range: impl RangeBounds<T::Key>,
) -> Result<RangeWalker<'_, T, Self>, DatabaseError> {
let start = match range.start_bound().cloned() {
Bound::Included(key) => {
self.execute_with_read_delay(|cursor| cursor.set_range(key.encode().as_ref()))
}
Bound::Included(key) => self.inner.set_range(key.encode().as_ref()),
Bound::Excluded(_key) => {
unreachable!("Rust doesn't allow for Bound::Excluded in starting bounds");
}
Bound::Unbounded => self.execute_with_read_delay(|cursor| cursor.first()),
Bound::Unbounded => self.inner.first(),
};
let start = decode::<T>(start).transpose();
Ok(RangeWalker::new(self, start, range.end_bound().cloned()))
@@ -164,15 +146,12 @@ impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
&mut self,
start_key: Option<T::Key>,
) -> Result<ReverseWalker<'_, T, Self>, DatabaseError> {
let start =
if let Some(start_key) = start_key {
decode::<T>(self.execute_with_read_delay(|cursor| {
cursor.set_range(start_key.encode().as_ref())
}))
} else {
self.last()
}
.transpose();
let start = if let Some(start_key) = start_key {
decode::<T>(self.inner.set_range(start_key.encode().as_ref()))
} else {
self.last()
}
.transpose();
Ok(ReverseWalker::new(self, start))
}
@@ -181,17 +160,18 @@ impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
/// Returns the previous `(key, value)` pair of a DUPSORT table.
fn prev_dup(&mut self) -> PairResult<T> {
decode::<T>(self.execute_with_read_delay(|cursor| cursor.prev_dup()))
decode::<T>(self.inner.prev_dup())
}
/// Returns the next `(key, value)` pair of a DUPSORT table.
fn next_dup(&mut self) -> PairResult<T> {
decode::<T>(self.execute_with_read_delay(|cursor| cursor.next_dup()))
decode::<T>(self.inner.next_dup())
}
/// Returns the last `value` of the current duplicate `key`.
fn last_dup(&mut self) -> ValueOnlyResult<T> {
self.execute_with_read_delay(|cursor| cursor.last_dup())
self.inner
.last_dup()
.map_err(|e| DatabaseError::Read(e.into()))?
.map(decode_one::<T>)
.transpose()
@@ -199,12 +179,13 @@ impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
/// Returns the next `(key, value)` pair skipping the duplicates.
fn next_no_dup(&mut self) -> PairResult<T> {
decode::<T>(self.execute_with_read_delay(|cursor| cursor.next_nodup()))
decode::<T>(self.inner.next_nodup())
}
/// Returns the next `value` of a duplicate `key`.
fn next_dup_val(&mut self) -> ValueOnlyResult<T> {
self.execute_with_read_delay(|cursor| cursor.next_dup())
self.inner
.next_dup()
.map_err(|e| DatabaseError::Read(e.into()))?
.map(decode_value::<T>)
.transpose()
@@ -215,12 +196,11 @@ impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
key: <T as Table>::Key,
subkey: <T as DupSort>::SubKey,
) -> ValueOnlyResult<T> {
self.execute_with_read_delay(|cursor| {
cursor.get_both_range(key.encode().as_ref(), subkey.encode().as_ref())
})
.map_err(|e| DatabaseError::Read(e.into()))?
.map(decode_one::<T>)
.transpose()
self.inner
.get_both_range(key.encode().as_ref(), subkey.encode().as_ref())
.map_err(|e| DatabaseError::Read(e.into()))?
.map(decode_one::<T>)
.transpose()
}
/// Depending on its arguments, returns an iterator starting at:
@@ -236,26 +216,25 @@ impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
let start = match (key, subkey) {
(Some(key), Some(subkey)) => {
let encoded_key = key.encode();
self.execute_with_read_delay(|cursor| {
cursor.get_both_range(encoded_key.as_ref(), subkey.encode().as_ref())
})
.map_err(|e| DatabaseError::Read(e.into()))?
.map(|val| decoder::<T>((Cow::Borrowed(encoded_key.as_ref()), val)))
self.inner
.get_both_range(encoded_key.as_ref(), subkey.encode().as_ref())
.map_err(|e| DatabaseError::Read(e.into()))?
.map(|val| decoder::<T>((Cow::Borrowed(encoded_key.as_ref()), val)))
}
(Some(key), None) => {
let encoded_key = key.encode();
self.execute_with_read_delay(|cursor| cursor.set(encoded_key.as_ref()))
self.inner
.set(encoded_key.as_ref())
.map_err(|e| DatabaseError::Read(e.into()))?
.map(|val| decoder::<T>((Cow::Borrowed(encoded_key.as_ref()), val)))
}
(None, Some(subkey)) => {
if let Some((key, _)) = self.first()? {
let encoded_key = key.encode();
self.execute_with_read_delay(|cursor| {
cursor.get_both_range(encoded_key.as_ref(), subkey.encode().as_ref())
})
.map_err(|e| DatabaseError::Read(e.into()))?
.map(|val| decoder::<T>((Cow::Borrowed(encoded_key.as_ref()), val)))
self.inner
.get_both_range(encoded_key.as_ref(), subkey.encode().as_ref())
.map_err(|e| DatabaseError::Read(e.into()))?
.map(|val| decoder::<T>((Cow::Borrowed(encoded_key.as_ref()), val)))
} else {
Some(Err(DatabaseError::Read(MDBXError::NotFound.into())))
}
@@ -384,7 +363,7 @@ impl<T: DupSort> DbDupCursorRW<T> for Cursor<RW, T> {
mod tests {
use crate::{
mdbx::{DatabaseArguments, DatabaseEnv, DatabaseEnvKind},
tables::{self, StorageChangeSets},
tables::StorageChangeSets,
Database,
};
use alloy_primitives::{address, Address, B256, U256};
@@ -395,7 +374,6 @@ mod tests {
transaction::{DbTx, DbTxMut},
};
use reth_primitives_traits::StorageEntry;
use std::time::{Duration, Instant};
use tempfile::TempDir;
fn create_test_db() -> DatabaseEnv {
@@ -485,27 +463,4 @@ mod tests {
assert_eq!(copied_value, expected_value);
}
}
#[test]
fn read_delay_applies_to_cursor_reads() {
let path = TempDir::new().unwrap();
let mut db = DatabaseEnv::open(
path.path(),
DatabaseEnvKind::RW,
DatabaseArguments::new(ClientVersion::default())
.with_read_delay(Some(Duration::from_millis(2))),
)
.unwrap();
db.create_tables().unwrap();
let tx = db.tx_mut().unwrap();
tx.put::<tables::HeaderNumbers>(B256::ZERO, 1).unwrap();
tx.commit().unwrap();
let tx = db.tx().unwrap();
let mut cursor = tx.cursor_read::<tables::HeaderNumbers>().unwrap();
let start = Instant::now();
let _ = cursor.first().unwrap();
assert!(start.elapsed() >= Duration::from_millis(2));
}
}

View File

@@ -119,8 +119,6 @@ pub struct DatabaseArguments {
/// environments). Choose `SafeNoSync` if performance is more important and occasional data
/// loss is acceptable (e.g., testing or ephemeral data).
sync_mode: SyncMode,
/// Optional artificial delay applied to every MDBX read operation.
read_delay: Option<std::time::Duration>,
}
impl Default for DatabaseArguments {
@@ -145,7 +143,6 @@ impl DatabaseArguments {
exclusive: None,
max_readers: None,
sync_mode: SyncMode::Durable,
read_delay: None,
}
}
@@ -193,12 +190,6 @@ impl DatabaseArguments {
self
}
/// Sets an artificial delay applied to each MDBX read operation.
pub const fn with_read_delay(mut self, read_delay: Option<std::time::Duration>) -> Self {
self.read_delay = read_delay;
self
}
/// Configures the database growth step in bytes.
pub const fn with_growth_step(mut self, growth_step: Option<usize>) -> Self {
if let Some(growth_step) = growth_step {
@@ -263,8 +254,6 @@ pub struct DatabaseEnv {
dbis: Arc<HashMap<&'static str, ffi::MDBX_dbi>>,
/// Cache for metric handles. If `None`, metrics are not recorded.
metrics: Option<Arc<DatabaseEnvMetrics>>,
/// Optional artificial delay applied to every MDBX read operation.
read_delay: Option<std::time::Duration>,
/// Write lock for when dealing with a read-write environment.
_lock_file: Option<StorageLock>,
}
@@ -278,7 +267,6 @@ impl Database for DatabaseEnv {
self.inner.begin_ro_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
self.dbis.clone(),
self.metrics.clone(),
self.read_delay,
)
.map_err(|e| DatabaseError::InitTx(e.into()))
}
@@ -288,7 +276,6 @@ impl Database for DatabaseEnv {
self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
self.dbis.clone(),
self.metrics.clone(),
self.read_delay,
)
.map_err(|e| DatabaseError::InitTx(e.into()))
}
@@ -550,7 +537,6 @@ impl DatabaseEnv {
path: path.to_path_buf(),
dbis: Arc::default(),
metrics: None,
read_delay: args.read_delay,
_lock_file,
};

View File

@@ -40,8 +40,6 @@ pub struct Tx<K: TransactionKind> {
///
/// If [Some], then metrics are reported.
metrics_handler: Option<MetricsHandler<K>>,
/// Optional artificial delay applied to MDBX read operations.
read_delay: Option<Duration>,
}
impl<K: TransactionKind> Tx<K> {
@@ -52,7 +50,6 @@ impl<K: TransactionKind> Tx<K> {
inner: Transaction<K>,
dbis: Arc<HashMap<&'static str, MDBX_dbi>>,
env_metrics: Option<Arc<DatabaseEnvMetrics>>,
read_delay: Option<Duration>,
) -> reth_libmdbx::Result<Self> {
let metrics_handler = env_metrics
.map(|env_metrics| {
@@ -62,7 +59,7 @@ impl<K: TransactionKind> Tx<K> {
Ok(handler)
})
.transpose()?;
Ok(Self { inner, dbis, metrics_handler, read_delay })
Ok(Self { inner, dbis, metrics_handler })
}
/// Returns a reference to the inner libmdbx transaction.
@@ -104,7 +101,6 @@ impl<K: TransactionKind> Tx<K> {
Ok(Cursor::new_with_metrics(
inner,
self.metrics_handler.as_ref().map(|h| h.env_metrics.clone()),
self.read_delay,
))
}
@@ -174,19 +170,6 @@ impl<K: TransactionKind> Tx<K> {
f(&self.inner)
}
}
fn execute_with_read_delay<T: Table, R>(
&self,
operation: Operation,
value_size: Option<usize>,
f: impl FnOnce(&Transaction<K>) -> R,
) -> R {
self.execute_with_operation_metric::<T, _>(operation, value_size, |tx| {
let result = f(tx);
apply_read_delay(self.read_delay);
result
})
}
}
#[derive(Debug)]
@@ -311,7 +294,7 @@ impl<K: TransactionKind> DbTx for Tx<K> {
&self,
key: &<T::Key as Encode>::Encoded,
) -> Result<Option<T::Value>, DatabaseError> {
self.execute_with_read_delay::<T, _>(Operation::Get, None, |tx| {
self.execute_with_operation_metric::<T, _>(Operation::Get, None, |tx| {
tx.get(self.get_dbi::<T>()?, key.as_ref())
.map_err(|e| DatabaseError::Read(e.into()))?
.map(decode_one::<T>)
@@ -464,11 +447,7 @@ mod tests {
use reth_db_api::{database::Database, models::ClientVersion, transaction::DbTx};
use reth_libmdbx::MaxReadTransactionDuration;
use reth_storage_errors::db::DatabaseError;
use std::{
sync::atomic::Ordering,
thread::sleep,
time::{Duration, Instant},
};
use std::{sync::atomic::Ordering, thread::sleep, time::Duration};
use tempfile::tempdir;
#[test]
@@ -519,18 +498,4 @@ mod tests {
// Backtrace is recorded.
assert!(tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed));
}
#[test]
fn read_delay_applies_to_get() {
let dir = tempdir().unwrap();
let args = DatabaseArguments::new(ClientVersion::default())
.with_read_delay(Some(Duration::from_millis(2)));
let mut db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap();
db.create_tables().unwrap();
let tx = db.tx().unwrap();
let start = Instant::now();
let _ = tx.get::<tables::Transactions>(0).unwrap();
assert!(start.elapsed() >= Duration::from_millis(2));
}
}

View File

@@ -4,10 +4,7 @@ use crate::{
table::{Decode, Decompress, Table, TableRow},
DatabaseError,
};
use std::{
borrow::Cow,
time::{Duration, Instant},
};
use std::borrow::Cow;
/// Helper function to decode a `(key, value)` pair.
pub(crate) fn decoder<'a, T>(
@@ -53,17 +50,3 @@ where
Cow::Owned(v) => Decompress::decompress_owned(v)?,
})
}
/// Applies an artificial read delay.
///
/// This uses a short busy-wait instead of `thread::sleep` so hidden benchmark knobs like
/// `--db.read-delay=500us` can approximate sub-millisecond latency without depending on OS sleep
/// granularity.
pub(crate) fn apply_read_delay(read_delay: Option<Duration>) {
let Some(read_delay) = read_delay.filter(|delay| !delay.is_zero()) else { return };
let start = Instant::now();
while start.elapsed() < read_delay {
std::hint::spin_loop();
}
}

View File

@@ -0,0 +1,109 @@
use alloy_primitives::{BlockHash, BlockNumber, Bytes};
use parking_lot::RwLock;
use reth_storage_api::{BalStore, GetBlockAccessListLimit};
use reth_storage_errors::provider::ProviderResult;
use std::{collections::HashMap, sync::Arc};
/// Basic in-memory BAL store keyed by block hash.
#[derive(Debug, Clone, Default)]
pub struct InMemoryBalStore {
entries: Arc<RwLock<HashMap<BlockHash, Bytes>>>,
}
impl BalStore for InMemoryBalStore {
fn insert(
&self,
block_hash: BlockHash,
_block_number: BlockNumber,
bal: Bytes,
) -> ProviderResult<()> {
self.entries.write().insert(block_hash, bal);
Ok(())
}
fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> ProviderResult<Vec<Option<Bytes>>> {
let entries = self.entries.read();
let mut result = Vec::with_capacity(block_hashes.len());
for hash in block_hashes {
result.push(entries.get(hash).cloned());
}
Ok(result)
}
fn append_by_hashes_with_limit(
&self,
block_hashes: &[BlockHash],
limit: GetBlockAccessListLimit,
out: &mut Vec<Bytes>,
) -> ProviderResult<()> {
let entries = self.entries.read();
let mut size = 0;
for hash in block_hashes {
let bal = entries.get(hash).cloned().unwrap_or_else(|| Bytes::from_static(&[0xc0]));
size += bal.len();
out.push(bal);
if limit.exceeds(size) {
break
}
}
Ok(())
}
fn get_by_range(&self, _start: BlockNumber, _count: u64) -> ProviderResult<Vec<Bytes>> {
Ok(Vec::new())
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_primitives::B256;
#[test]
fn insert_and_lookup_by_hash() {
let store = InMemoryBalStore::default();
let hash = B256::random();
let missing = B256::random();
let bal = Bytes::from_static(b"bal");
store.insert(hash, 1, bal.clone()).unwrap();
assert_eq!(store.get_by_hashes(&[hash, missing]).unwrap(), vec![Some(bal), None]);
}
#[test]
fn range_lookup_is_empty() {
let store = InMemoryBalStore::default();
assert!(store.get_by_range(1, 10).unwrap().is_empty());
}
#[test]
fn limited_lookup_returns_prefix() {
let store = InMemoryBalStore::default();
let hash0 = B256::random();
let hash1 = B256::random();
let hash2 = B256::random();
let bal0 = Bytes::from_static(&[0xc1, 0x01]);
let bal1 = Bytes::from_static(&[0xc1, 0x02]);
let bal2 = Bytes::from_static(&[0xc1, 0x03]);
store.insert(hash0, 1, bal0.clone()).unwrap();
store.insert(hash1, 2, bal1.clone()).unwrap();
store.insert(hash2, 3, bal2).unwrap();
let limited = store
.get_by_hashes_with_limit(
&[hash0, hash1, hash2],
GetBlockAccessListLimit::ResponseSizeSoftLimit(2),
)
.unwrap();
assert_eq!(limited, vec![bal0, bal1]);
}
}

View File

@@ -38,6 +38,9 @@ pub mod test_utils;
pub mod either_writer;
pub use either_writer::*;
mod bal;
pub use bal::InMemoryBalStore;
pub use reth_chain_state::{
CanonStateNotification, CanonStateNotificationSender, CanonStateNotificationStream,
CanonStateNotifications, CanonStateSubscriptions,
@@ -48,8 +51,9 @@ pub use revm_database::states::OriginalValuesKnown;
// reexport traits to avoid breaking changes
pub use reth_static_file_types as static_file;
pub use reth_storage_api::{
BalProvider, BalStore, BalStoreHandle, HistoryWriter, MetadataProvider, MetadataWriter,
NoopBalStore, StateWriteConfig, StatsReader, StorageSettings, StorageSettingsCache,
BalProvider, BalStore, BalStoreHandle, GetBlockAccessListLimit, HistoryWriter,
MetadataProvider, MetadataWriter, NoopBalStore, StateWriteConfig, StatsReader, StorageSettings,
StorageSettingsCache,
};
/// Re-export provider error.
pub use reth_storage_errors::provider::{ProviderError, ProviderResult};

View File

@@ -6,8 +6,8 @@ use crate::{
AccountReader, BalProvider, BalStoreHandle, BlockHashReader, BlockIdReader, BlockNumReader,
BlockReader, BlockReaderIdExt, BlockSource, CanonChainTracker, CanonStateNotifications,
CanonStateSubscriptions, ChainSpecProvider, ChainStateBlockReader, ChangeSetReader,
DatabaseProviderFactory, HashedPostStateProvider, HeaderProvider, ProviderError,
ProviderFactory, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
DatabaseProviderFactory, HashedPostStateProvider, HeaderProvider, InMemoryBalStore,
ProviderError, ProviderFactory, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
RocksDBProviderFactory, StageCheckpointReader, StateProviderBox, StateProviderFactory,
StateReader, StaticFileProviderFactory, TransactionVariant, TransactionsProvider,
};
@@ -111,7 +111,7 @@ impl<N: ProviderNodeTypes> BlockchainProvider<N> {
finalized_header,
safe_header,
),
bal_store: BalStoreHandle::default(),
bal_store: BalStoreHandle::new(InMemoryBalStore::default()),
})
}

View File

@@ -5,11 +5,11 @@ use crate::{
},
to_range,
traits::{BlockSource, ReceiptProvider},
BlockHashReader, BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory,
EitherWriterDestination, HashedPostStateProvider, HeaderProvider, HeaderSyncGapProvider,
MetadataProvider, ProviderError, PruneCheckpointReader, RocksDBProviderFactory,
StageCheckpointReader, StateProviderBox, StaticFileProviderFactory, StaticFileWriter,
TransactionVariant, TransactionsProvider,
BalProvider, BalStoreHandle, BlockHashReader, BlockNumReader, BlockReader, ChainSpecProvider,
DatabaseProviderFactory, EitherWriterDestination, HashedPostStateProvider, HeaderProvider,
HeaderSyncGapProvider, MetadataProvider, ProviderError, PruneCheckpointReader,
RocksDBProviderFactory, StageCheckpointReader, StateProviderBox, StaticFileProviderFactory,
StaticFileWriter, TransactionVariant, TransactionsProvider,
};
use alloy_consensus::transaction::TransactionMeta;
use alloy_eips::BlockHashOrNumber;
@@ -90,6 +90,8 @@ pub struct ProviderFactory<N: NodeTypesWithDB> {
rocksdb_provider: RocksDBProvider,
/// Changeset cache for trie unwinding
changeset_cache: ChangesetCache,
/// Store for block access lists.
bal_store: BalStoreHandle,
/// Task runtime for spawning parallel I/O work.
runtime: reth_tasks::Runtime,
/// Minimum distance from tip required before pruning can occur.
@@ -152,6 +154,7 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
storage_settings: Arc::new(RwLock::new(storage_settings)),
rocksdb_provider,
changeset_cache: ChangesetCache::new(),
bal_store: BalStoreHandle::default(),
runtime,
minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
read_only_sync: None,
@@ -583,6 +586,12 @@ impl<N: NodeTypesWithDB> NodePrimitivesProvider for ProviderFactory<N> {
type Primitives = N::Primitives;
}
impl<N: NodeTypesWithDB> BalProvider for ProviderFactory<N> {
fn bal_store(&self) -> &BalStoreHandle {
&self.bal_store
}
}
impl<N: ProviderNodeTypes> DatabaseProviderFactory for ProviderFactory<N> {
type DB = N::DB;
type Provider = DatabaseProvider<<N::DB as Database>::TX, N>;
@@ -955,6 +964,7 @@ where
storage_settings,
rocksdb_provider,
changeset_cache,
bal_store,
runtime,
minimum_pruning_distance,
read_only_sync,
@@ -968,6 +978,7 @@ where
.field("storage_settings", &*storage_settings.read())
.field("rocksdb_provider", &rocksdb_provider)
.field("changeset_cache", &changeset_cache)
.field("bal_store", &bal_store)
.field("runtime", &runtime)
.field("minimum_pruning_distance", &minimum_pruning_distance)
.field(
@@ -989,6 +1000,7 @@ impl<N: NodeTypesWithDB> Clone for ProviderFactory<N> {
storage_settings: self.storage_settings.clone(),
rocksdb_provider: self.rocksdb_provider.clone(),
changeset_cache: self.changeset_cache.clone(),
bal_store: self.bal_store.clone(),
runtime: self.runtime.clone(),
minimum_pruning_distance: self.minimum_pruning_distance,
read_only_sync: self.read_only_sync.clone(),

View File

@@ -264,8 +264,8 @@ impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
/// This keeps MDBX as the first durable step so an interrupted unwind can be recovered by
/// truncating static files from checkpoints on the next startup.
///
/// For `storage_v2`, this waits after the MDBX commit so readers holding older MDBX-visible
/// views cannot overlap the `RocksDB` unwind.
/// This waits after the MDBX commit so readers holding older MDBX-visible views cannot overlap
/// later cross-store unwind steps.
///
/// Historical `storage_v2` reads ignore `RocksDB` history entries above their MDBX-visible tip,
/// so no additional post-`RocksDB` wait is needed before static-file commit.
@@ -274,11 +274,11 @@ impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
let reader_txn_tracker = self.reader_txn_tracker.clone();
self.tx.commit()?;
if storage_v2 {
if let Some(reader_txn_tracker) = reader_txn_tracker.as_ref() {
reader_txn_tracker.wait_for_pre_commit_readers();
}
if let Some(reader_txn_tracker) = reader_txn_tracker.as_ref() {
reader_txn_tracker.wait_for_pre_commit_readers();
}
if storage_v2 {
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
@@ -300,8 +300,16 @@ impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
&'a self,
block_hash: BlockHash,
) -> ProviderResult<Box<dyn StateProvider + 'a>> {
let mut block_number =
let block_number =
self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
self.history_by_block_number(block_number)
}
/// Storage provider for state at that given block number
pub fn history_by_block_number<'a>(
&'a self,
mut block_number: BlockNumber,
) -> ProviderResult<Box<dyn StateProvider + 'a>> {
if block_number == self.best_block_number().unwrap_or_default() &&
block_number == self.last_block_number().unwrap_or_default()
{
@@ -316,8 +324,8 @@ impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
let storage_history_prune_checkpoint =
self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
let mut state_provider =
HistoricalStateProviderRef::new(self, block_number, self.changeset_cache.clone());
// If we pruned account or storage history, we can't return state on every historical block.
// Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
if let Some(prune_checkpoint_block_number) =
@@ -933,8 +941,9 @@ impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for Databa
self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
let storage_history_prune_checkpoint =
self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
let changeset_cache = self.changeset_cache.clone();
let mut state_provider = HistoricalStateProvider::new(self, block_number);
let mut state_provider = HistoricalStateProvider::new(self, block_number, changeset_cache);
// If we pruned account or storage history, we can't return state on every historical block.
// Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
@@ -3960,7 +3969,6 @@ mod tests {
#[test]
fn unwind_commit_waits_for_pre_commit_readers() {
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(StorageSettings::v2());
let reader = factory.provider().unwrap();
let provider_rw = factory.unwind_provider_rw().unwrap();
@@ -4970,7 +4978,9 @@ mod tests {
assert_eq!(account_cs[0].address, address);
let historical_value =
HistoricalStateProviderRef::new(&*provider_rw, 0).storage(address, slot_key).unwrap();
HistoricalStateProviderRef::new(&*provider_rw, 0, ChangesetCache::new())
.storage(address, slot_key)
.unwrap();
assert_eq!(historical_value, None);
}

View File

@@ -20,7 +20,7 @@ pub use state::{
HistoricalStateProviderRef, HistoryInfo, LowestAvailableBlocks,
},
latest::{LatestStateProvider, LatestStateProviderRef},
overlay::{OverlayStateProvider, OverlayStateProviderFactory},
overlay::{OverlayBuilder, OverlayStateProvider, OverlayStateProviderFactory},
};
mod consistent_view;

View File

@@ -1,3 +1,4 @@
use super::overlay::{Overlay, OverlayBuilder, OverlaySource};
use crate::{
AccountReader, BlockHashReader, ChangeSetReader, EitherReader, HashedPostStateProvider,
ProviderError, RocksDBProviderFactory, StateProvider, StateRootProvider,
@@ -13,8 +14,9 @@ use reth_db_api::{
};
use reth_primitives_traits::{Account, Bytecode};
use reth_storage_api::{
BlockNumReader, BytecodeReader, DBProvider, NodePrimitivesProvider, StateProofProvider,
StorageChangeSetReader, StorageRootProvider, StorageSettingsCache,
BlockNumReader, BytecodeReader, DBProvider, NodePrimitivesProvider, PruneCheckpointReader,
StageCheckpointReader, StateProofProvider, StorageChangeSetReader, StorageRootProvider,
StorageSettingsCache,
};
use reth_storage_errors::provider::ProviderResult;
use reth_trie::{
@@ -23,16 +25,15 @@ use reth_trie::{
trie_cursor::InMemoryTrieCursorFactory,
updates::TrieUpdates,
witness::TrieWitness,
AccountProof, ExecutionWitnessMode, HashedPostState, HashedPostStateSorted, HashedStorage,
KeccakKeyHasher, MultiProof, MultiProofTargets, StateRoot, StorageMultiProof, StorageRoot,
TrieInput, TrieInputSorted,
AccountProof, ExecutionWitnessMode, HashedPostState, HashedStorage, KeccakKeyHasher,
MultiProof, MultiProofTargets, StateRoot, StorageMultiProof, StorageRoot, TrieInput,
TrieInputSorted,
};
use reth_trie_db::{
hashed_storage_from_reverts_with_provider, DatabaseProof, DatabaseStateRoot,
DatabaseStorageProof, DatabaseStorageRoot,
ChangesetCache, DatabaseProof, DatabaseStateRoot, DatabaseStorageProof, DatabaseStorageRoot,
};
use std::fmt::Debug;
use std::{fmt::Debug, sync::Arc};
type DbStateRoot<'a, TX, A> = StateRoot<
reth_trie_db::DatabaseTrieCursorFactory<&'a TX, A>,
@@ -123,6 +124,8 @@ impl HistoryInfo {
pub struct HistoricalStateProviderRef<'b, Provider> {
/// Database provider
provider: &'b Provider,
/// Changeset cache handle for retrieving trie changesets.
changeset_cache: ChangesetCache,
/// Block number is main index for the history state of accounts and storages.
block_number: BlockNumber,
/// Lowest blocks at which different parts of the state are available.
@@ -133,8 +136,17 @@ impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + Block
HistoricalStateProviderRef<'b, Provider>
{
/// Create new `StateProvider` for historical block number
pub fn new(provider: &'b Provider, block_number: BlockNumber) -> Self {
Self { provider, block_number, lowest_available_blocks: Default::default() }
pub fn new(
provider: &'b Provider,
block_number: BlockNumber,
changeset_cache: ChangesetCache,
) -> Self {
Self {
provider,
changeset_cache,
block_number,
lowest_available_blocks: Default::default(),
}
}
/// Create new `StateProvider` for historical block number and lowest block numbers at which
@@ -143,8 +155,9 @@ impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + Block
provider: &'b Provider,
block_number: BlockNumber,
lowest_available_blocks: LowestAvailableBlocks,
changeset_cache: ChangesetCache,
) -> Self {
Self { provider, block_number, lowest_available_blocks }
Self { provider, changeset_cache, block_number, lowest_available_blocks }
}
/// Lookup an account in the `AccountsHistory` table using `EitherReader`.
@@ -253,17 +266,11 @@ impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + Block
Ok(tip.saturating_sub(self.block_number) > limit)
}
/// Retrieve revert hashed state for this history provider.
fn revert_state(&self) -> ProviderResult<HashedPostStateSorted>
fn build_overlay(&self, input: TrieInputSorted) -> ProviderResult<TrieInputSorted>
where
Provider: StorageSettingsCache,
Provider:
BlockHashReader + PruneCheckpointReader + StageCheckpointReader + StorageSettingsCache,
{
if !self.lowest_available_blocks.is_account_history_available(self.block_number) ||
!self.lowest_available_blocks.is_storage_history_available(self.block_number)
{
return Err(ProviderError::StateAtBlockPruned(self.block_number))
}
if self.check_distance_against_limit(EPOCH_SLOTS)? {
tracing::warn!(
target: "providers::historical_sp",
@@ -272,27 +279,22 @@ impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + Block
);
}
reth_trie_db::from_reverts_auto(self.provider, self.block_number..)
}
// Historical providers expose state at the start of `self.block_number`, so the overlay
// builder needs the previous canonical block hash to preserve those semantics.
let target_block = self.block_number.saturating_sub(1);
let block_hash = self
.provider
.block_hash(target_block)?
.ok_or_else(|| ProviderError::HeaderNotFound(target_block.into()))?;
/// Retrieve revert hashed storage for this history provider and target address.
fn revert_storage(&self, address: Address) -> ProviderResult<HashedStorage>
where
Provider: StorageSettingsCache,
{
if !self.lowest_available_blocks.is_storage_history_available(self.block_number) {
return Err(ProviderError::StateAtBlockPruned(self.block_number))
}
let TrieInputSorted { nodes, state, prefix_sets } = input;
let overlay_builder = OverlayBuilder::new(self.changeset_cache.clone())
.with_block_hash(Some(block_hash))
.with_overlay_source(Some(OverlaySource::Immediate { trie: nodes, state }));
let Overlay { trie_updates, hashed_post_state } =
overlay_builder.build_overlay(self.provider)?;
if self.check_distance_against_limit(EPOCH_SLOTS * 10)? {
tracing::warn!(
target: "providers::historical_sp",
target = self.block_number,
"Attempt to calculate storage root for an old block might result in OOM"
);
}
hashed_storage_from_reverts_with_provider(self.provider, address, self.block_number)
Ok(TrieInputSorted::new(trie_updates, hashed_post_state, prefix_sets))
}
/// Set the lowest block number at which the account history is available.
@@ -378,26 +380,25 @@ impl<
+ ChangeSetReader
+ StorageChangeSetReader
+ BlockNumReader
+ BlockHashReader
+ PruneCheckpointReader
+ StageCheckpointReader
+ StorageSettingsCache,
> StateRootProvider for HistoricalStateProviderRef<'_, Provider>
{
fn state_root(&self, hashed_state: HashedPostState) -> ProviderResult<B256> {
reth_trie_db::with_adapter!(self.provider, |A| {
let mut revert_state = self.revert_state()?;
let hashed_state_sorted = hashed_state.into_sorted();
revert_state.extend_ref_and_sort(&hashed_state_sorted);
Ok(<DbStateRoot<'_, _, A>>::overlay_root(self.tx(), &revert_state)?)
let input = self.build_overlay(TrieInputSorted::from_unsorted(
TrieInput::from_state(hashed_state),
))?;
Ok(<DbStateRoot<'_, _, A>>::overlay_root_from_nodes(self.tx(), input)?)
})
}
fn state_root_from_nodes(&self, input: TrieInput) -> ProviderResult<B256> {
reth_trie_db::with_adapter!(self.provider, |A| {
let mut input = input;
input.prepend(self.revert_state()?.into());
Ok(<DbStateRoot<'_, _, A>>::overlay_root_from_nodes(
self.tx(),
TrieInputSorted::from_unsorted(input),
)?)
let input = self.build_overlay(TrieInputSorted::from_unsorted(input))?;
Ok(<DbStateRoot<'_, _, A>>::overlay_root_from_nodes(self.tx(), input)?)
})
}
@@ -406,10 +407,10 @@ impl<
hashed_state: HashedPostState,
) -> ProviderResult<(B256, TrieUpdates)> {
reth_trie_db::with_adapter!(self.provider, |A| {
let mut revert_state = self.revert_state()?;
let hashed_state_sorted = hashed_state.into_sorted();
revert_state.extend_ref_and_sort(&hashed_state_sorted);
Ok(<DbStateRoot<'_, _, A>>::overlay_root_with_updates(self.tx(), &revert_state)?)
let input = self.build_overlay(TrieInputSorted::from_unsorted(
TrieInput::from_state(hashed_state),
))?;
Ok(<DbStateRoot<'_, _, A>>::overlay_root_from_nodes_with_updates(self.tx(), input)?)
})
}
@@ -418,12 +419,8 @@ impl<
input: TrieInput,
) -> ProviderResult<(B256, TrieUpdates)> {
reth_trie_db::with_adapter!(self.provider, |A| {
let mut input = input;
input.prepend(self.revert_state()?.into());
Ok(<DbStateRoot<'_, _, A>>::overlay_root_from_nodes_with_updates(
self.tx(),
TrieInputSorted::from_unsorted(input),
)?)
let input = self.build_overlay(TrieInputSorted::from_unsorted(input))?;
Ok(<DbStateRoot<'_, _, A>>::overlay_root_from_nodes_with_updates(self.tx(), input)?)
})
}
}
@@ -433,6 +430,9 @@ impl<
+ ChangeSetReader
+ StorageChangeSetReader
+ BlockNumReader
+ BlockHashReader
+ PruneCheckpointReader
+ StageCheckpointReader
+ StorageSettingsCache,
> StorageRootProvider for HistoricalStateProviderRef<'_, Provider>
{
@@ -442,9 +442,20 @@ impl<
hashed_storage: HashedStorage,
) -> ProviderResult<B256> {
reth_trie_db::with_adapter!(self.provider, |A| {
let mut revert_storage = self.revert_storage(address)?;
revert_storage.extend(&hashed_storage);
<DbStorageRoot<'_, _, A>>::overlay_root(self.tx(), address, revert_storage)
let input = self.build_overlay(TrieInputSorted::from_unsorted(
TrieInput::from_state(HashedPostState::from_hashed_storage(
alloy_primitives::keccak256(address),
hashed_storage,
)),
))?;
let hashed_storage = input
.state
.account_storages()
.get(&alloy_primitives::keccak256(address))
.cloned()
.unwrap_or_default()
.into();
<DbStorageRoot<'_, _, A>>::overlay_root(self.tx(), address, hashed_storage)
.map_err(|err| ProviderError::Database(err.into()))
})
}
@@ -456,13 +467,24 @@ impl<
hashed_storage: HashedStorage,
) -> ProviderResult<reth_trie::StorageProof> {
reth_trie_db::with_adapter!(self.provider, |A| {
let mut revert_storage = self.revert_storage(address)?;
revert_storage.extend(&hashed_storage);
let input = self.build_overlay(TrieInputSorted::from_unsorted(
TrieInput::from_state(HashedPostState::from_hashed_storage(
alloy_primitives::keccak256(address),
hashed_storage,
)),
))?;
let hashed_storage = input
.state
.account_storages()
.get(&alloy_primitives::keccak256(address))
.cloned()
.unwrap_or_default()
.into();
<DbStorageProof<'_, _, A>>::overlay_storage_proof(
self.tx(),
address,
slot,
revert_storage,
hashed_storage,
)
.map_err(ProviderError::from)
})
@@ -475,13 +497,24 @@ impl<
hashed_storage: HashedStorage,
) -> ProviderResult<StorageMultiProof> {
reth_trie_db::with_adapter!(self.provider, |A| {
let mut revert_storage = self.revert_storage(address)?;
revert_storage.extend(&hashed_storage);
let input = self.build_overlay(TrieInputSorted::from_unsorted(
TrieInput::from_state(HashedPostState::from_hashed_storage(
alloy_primitives::keccak256(address),
hashed_storage,
)),
))?;
let hashed_storage = input
.state
.account_storages()
.get(&alloy_primitives::keccak256(address))
.cloned()
.unwrap_or_default()
.into();
<DbStorageProof<'_, _, A>>::overlay_storage_multiproof(
self.tx(),
address,
slots,
revert_storage,
hashed_storage,
)
.map_err(ProviderError::from)
})
@@ -493,6 +526,9 @@ impl<
+ ChangeSetReader
+ StorageChangeSetReader
+ BlockNumReader
+ BlockHashReader
+ PruneCheckpointReader
+ StageCheckpointReader
+ StorageSettingsCache,
> StateProofProvider for HistoricalStateProviderRef<'_, Provider>
{
@@ -504,8 +540,13 @@ impl<
slots: &[B256],
) -> ProviderResult<AccountProof> {
reth_trie_db::with_adapter!(self.provider, |A| {
let mut input = input;
input.prepend(self.revert_state()?.into());
let TrieInputSorted { nodes, state, prefix_sets } =
self.build_overlay(TrieInputSorted::from_unsorted(input))?;
let input = TrieInput::new(
Arc::unwrap_or_clone(nodes).into(),
Arc::unwrap_or_clone(state).into(),
prefix_sets,
);
let proof = <DbProof<'_, _, A> as DatabaseProof>::from_tx(self.tx());
proof.overlay_account_proof(input, address, slots).map_err(ProviderError::from)
})
@@ -517,8 +558,13 @@ impl<
targets: MultiProofTargets,
) -> ProviderResult<MultiProof> {
reth_trie_db::with_adapter!(self.provider, |A| {
let mut input = input;
input.prepend(self.revert_state()?.into());
let TrieInputSorted { nodes, state, prefix_sets } =
self.build_overlay(TrieInputSorted::from_unsorted(input))?;
let input = TrieInput::new(
Arc::unwrap_or_clone(nodes).into(),
Arc::unwrap_or_clone(state).into(),
prefix_sets,
);
let proof = <DbProof<'_, _, A> as DatabaseProof>::from_tx(self.tx());
proof.overlay_multiproof(input, targets).map_err(ProviderError::from)
})
@@ -531,21 +577,19 @@ impl<
mode: ExecutionWitnessMode,
) -> ProviderResult<Vec<Bytes>> {
reth_trie_db::with_adapter!(self.provider, |A| {
let mut input = input;
input.prepend(self.revert_state()?.into());
let nodes_sorted = input.nodes.into_sorted();
let state_sorted = input.state.into_sorted();
let TrieInputSorted { nodes, state, prefix_sets } =
self.build_overlay(TrieInputSorted::from_unsorted(input))?;
let witness = TrieWitness::new(
InMemoryTrieCursorFactory::new(
reth_trie_db::DatabaseTrieCursorFactory::<_, A>::new(self.tx()),
&nodes_sorted,
nodes.as_ref(),
),
HashedPostStateCursorFactory::new(
reth_trie_db::DatabaseHashedCursorFactory::new(self.tx()),
&state_sorted,
state.as_ref(),
),
)
.with_prefix_sets_mut(input.prefix_sets)
.with_prefix_sets_mut(prefix_sets)
.with_execution_witness_mode(mode);
let witness =
if mode.is_canonical() { witness } else { witness.always_include_root_node() };
@@ -572,6 +616,8 @@ impl<
+ BlockHashReader
+ ChangeSetReader
+ StorageChangeSetReader
+ PruneCheckpointReader
+ StageCheckpointReader
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
@@ -602,6 +648,8 @@ impl<Provider: DBProvider + BlockNumReader> BytecodeReader
pub struct HistoricalStateProvider<Provider> {
/// Database provider.
provider: Provider,
/// Changeset cache handle for retrieving trie changesets.
changeset_cache: ChangesetCache,
/// State at the block number is the main indexer of the state.
block_number: BlockNumber,
/// Lowest blocks at which different parts of the state are available.
@@ -612,8 +660,17 @@ impl<Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + BlockNumR
HistoricalStateProvider<Provider>
{
/// Create new `StateProvider` for historical block number
pub fn new(provider: Provider, block_number: BlockNumber) -> Self {
Self { provider, block_number, lowest_available_blocks: Default::default() }
pub fn new(
provider: Provider,
block_number: BlockNumber,
changeset_cache: ChangesetCache,
) -> Self {
Self {
provider,
changeset_cache,
block_number,
lowest_available_blocks: Default::default(),
}
}
/// Set the lowest block number at which the account history is available.
@@ -636,17 +693,18 @@ impl<Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + BlockNumR
/// Returns a new provider that takes the `TX` as reference
#[inline(always)]
const fn as_ref(&self) -> HistoricalStateProviderRef<'_, Provider> {
fn as_ref(&self) -> HistoricalStateProviderRef<'_, Provider> {
HistoricalStateProviderRef::new_with_lowest_available_blocks(
&self.provider,
self.block_number,
self.lowest_available_blocks,
self.changeset_cache.clone(),
)
}
}
// Delegates all provider impls to [HistoricalStateProviderRef]
reth_storage_api::macros::delegate_provider_impls!(HistoricalStateProvider<Provider> where [Provider: DBProvider + BlockNumReader + BlockHashReader + ChangeSetReader + StorageChangeSetReader + StorageSettingsCache + RocksDBProviderFactory + NodePrimitivesProvider]);
reth_storage_api::macros::delegate_provider_impls!(HistoricalStateProvider<Provider> where [Provider: DBProvider + BlockNumReader + BlockHashReader + ChangeSetReader + StorageChangeSetReader + PruneCheckpointReader + StageCheckpointReader + StorageSettingsCache + RocksDBProviderFactory + NodePrimitivesProvider]);
/// Lowest blocks at which different parts of the state are available.
/// They may be [Some] if pruning is enabled.
@@ -779,9 +837,11 @@ mod tests {
use reth_primitives_traits::{Account, StorageEntry};
use reth_storage_api::{
BlockHashReader, BlockNumReader, ChangeSetReader, DBProvider, DatabaseProviderFactory,
NodePrimitivesProvider, StorageChangeSetReader, StorageSettingsCache,
NodePrimitivesProvider, PruneCheckpointReader, StageCheckpointReader,
StorageChangeSetReader, StorageSettingsCache,
};
use reth_storage_errors::provider::ProviderError;
use reth_trie_db::ChangesetCache;
const ADDRESS: Address = address!("0x0000000000000000000000000000000000000001");
const HIGHER_ADDRESS: Address = address!("0x0000000000000000000000000000000000000005");
@@ -796,6 +856,8 @@ mod tests {
+ BlockHashReader
+ ChangeSetReader
+ StorageChangeSetReader
+ PruneCheckpointReader
+ StageCheckpointReader
+ StorageSettingsCache
+ RocksDBProviderFactory
+ NodePrimitivesProvider,
@@ -870,48 +932,49 @@ mod tests {
// run
assert!(matches!(
HistoricalStateProviderRef::new(&db, 1).basic_account(&ADDRESS),
HistoricalStateProviderRef::new(&db, 1, ChangesetCache::new()).basic_account(&ADDRESS),
Ok(None)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 2).basic_account(&ADDRESS),
HistoricalStateProviderRef::new(&db, 2, ChangesetCache::new()).basic_account(&ADDRESS),
Ok(Some(acc)) if acc == acc_at3
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 3).basic_account(&ADDRESS),
HistoricalStateProviderRef::new(&db, 3, ChangesetCache::new()).basic_account(&ADDRESS),
Ok(Some(acc)) if acc == acc_at3
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 4).basic_account(&ADDRESS),
HistoricalStateProviderRef::new(&db, 4, ChangesetCache::new()).basic_account(&ADDRESS),
Ok(Some(acc)) if acc == acc_at7
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 7).basic_account(&ADDRESS),
HistoricalStateProviderRef::new(&db, 7, ChangesetCache::new()).basic_account(&ADDRESS),
Ok(Some(acc)) if acc == acc_at7
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 9).basic_account(&ADDRESS),
HistoricalStateProviderRef::new(&db, 9, ChangesetCache::new()).basic_account(&ADDRESS),
Ok(Some(acc)) if acc == acc_at10
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 10).basic_account(&ADDRESS),
HistoricalStateProviderRef::new(&db, 10, ChangesetCache::new()).basic_account(&ADDRESS),
Ok(Some(acc)) if acc == acc_at10
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 11).basic_account(&ADDRESS),
HistoricalStateProviderRef::new(&db, 11, ChangesetCache::new()).basic_account(&ADDRESS),
Ok(Some(acc)) if acc == acc_at15
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 16).basic_account(&ADDRESS),
HistoricalStateProviderRef::new(&db, 16, ChangesetCache::new()).basic_account(&ADDRESS),
Ok(Some(acc)) if acc == acc_plain
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 1).basic_account(&HIGHER_ADDRESS),
HistoricalStateProviderRef::new(&db, 1, ChangesetCache::new())
.basic_account(&HIGHER_ADDRESS),
Ok(None)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 1000).basic_account(&HIGHER_ADDRESS),
HistoricalStateProviderRef::new(&db, 1000, ChangesetCache::new()).basic_account(&HIGHER_ADDRESS),
Ok(Some(acc)) if acc == higher_acc_plain
));
}
@@ -970,43 +1033,46 @@ mod tests {
// run
assert!(matches!(
HistoricalStateProviderRef::new(&db, 0).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 0, ChangesetCache::new())
.storage(ADDRESS, STORAGE),
Ok(None)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 3).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 3, ChangesetCache::new())
.storage(ADDRESS, STORAGE),
Ok(Some(U256::ZERO))
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 4).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 4, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == entry_at7.value
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 7).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 7, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == entry_at7.value
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 9).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 9, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == entry_at10.value
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 10).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 10, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == entry_at10.value
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 11).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 11, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == entry_at15.value
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 16).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 16, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == entry_plain.value
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 1).storage(HIGHER_ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 1, ChangesetCache::new())
.storage(HIGHER_ADDRESS, STORAGE),
Ok(None)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 1000).storage(HIGHER_ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 1000, ChangesetCache::new()).storage(HIGHER_ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == higher_entry_plain.value
));
}
@@ -1025,6 +1091,7 @@ mod tests {
account_history_block_number: Some(3),
storage_history_block_number: Some(3),
},
ChangesetCache::new(),
);
assert!(matches!(
provider.account_history_lookup(ADDRESS),
@@ -1044,6 +1111,7 @@ mod tests {
account_history_block_number: Some(2),
storage_history_block_number: Some(2),
},
ChangesetCache::new(),
);
assert!(matches!(
provider.account_history_lookup(ADDRESS),
@@ -1063,6 +1131,7 @@ mod tests {
account_history_block_number: Some(1),
storage_history_block_number: Some(1),
},
ChangesetCache::new(),
);
assert!(matches!(
provider.account_history_lookup(ADDRESS),
@@ -1143,43 +1212,46 @@ mod tests {
let db = factory.provider().unwrap();
assert!(matches!(
HistoricalStateProviderRef::new(&db, 0).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 0, ChangesetCache::new())
.storage(ADDRESS, STORAGE),
Ok(None)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 3).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 3, ChangesetCache::new())
.storage(ADDRESS, STORAGE),
Ok(Some(U256::ZERO))
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 4).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 4, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == entry_at7.value
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 7).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 7, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == entry_at7.value
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 9).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 9, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == entry_at10.value
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 10).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 10, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == entry_at10.value
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 11).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 11, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == entry_at15.value
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 16).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 16, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == entry_plain.value
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 1).storage(HIGHER_ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 1, ChangesetCache::new())
.storage(HIGHER_ADDRESS, STORAGE),
Ok(None)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 1000).storage(HIGHER_ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 1000, ChangesetCache::new()).storage(HIGHER_ADDRESS, STORAGE),
Ok(Some(expected_value)) if expected_value == higher_entry_plain.value
));
}
@@ -1283,43 +1355,46 @@ mod tests {
let db = factory.provider().unwrap();
assert!(matches!(
HistoricalStateProviderRef::new(&db, 0).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 0, ChangesetCache::new())
.storage(ADDRESS, STORAGE),
Ok(None)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 3).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 3, ChangesetCache::new())
.storage(ADDRESS, STORAGE),
Ok(Some(U256::ZERO))
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 4).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 4, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(v)) if v == U256::from(7)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 7).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 7, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(v)) if v == U256::from(7)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 9).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 9, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(v)) if v == U256::from(10)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 10).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 10, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(v)) if v == U256::from(10)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 11).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 11, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(v)) if v == U256::from(15)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 16).storage(ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 16, ChangesetCache::new()).storage(ADDRESS, STORAGE),
Ok(Some(v)) if v == U256::from(100)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 1).storage(HIGHER_ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 1, ChangesetCache::new())
.storage(HIGHER_ADDRESS, STORAGE),
Ok(None)
));
assert!(matches!(
HistoricalStateProviderRef::new(&db, 1000).storage(HIGHER_ADDRESS, STORAGE),
HistoricalStateProviderRef::new(&db, 1000, ChangesetCache::new()).storage(HIGHER_ADDRESS, STORAGE),
Ok(Some(v)) if v == U256::from(1000)
));
}

View File

@@ -51,9 +51,9 @@ pub(crate) struct OverlayStateProviderMetrics {
/// Contains all fields required to initialize an [`OverlayStateProvider`].
#[derive(Debug, Clone)]
struct Overlay {
trie_updates: Arc<TrieUpdatesSorted>,
hashed_post_state: Arc<HashedPostStateSorted>,
pub(super) struct Overlay {
pub(super) trie_updates: Arc<TrieUpdatesSorted>,
pub(super) hashed_post_state: Arc<HashedPostStateSorted>,
}
/// Source of overlay data for [`OverlayStateProviderFactory`].
@@ -85,14 +85,12 @@ impl OverlaySource {
}
}
/// Factory for creating overlay state providers with optional reverts and overlays.
/// Builder for calculating trie and hashed-state overlays.
///
/// This factory allows building an `OverlayStateProvider` whose DB state has been reverted to a
/// particular block, and/or with additional overlay information added on top.
/// This stores the overlay configuration and the logic for resolving immediate/lazy overlays and
/// collecting reverts. It is intentionally independent from any provider factory or overlay cache.
#[derive(Debug, Clone)]
pub struct OverlayStateProviderFactory<F> {
/// The underlying database provider factory
factory: F,
pub struct OverlayBuilder {
/// Optional block hash for collecting reverts
block_hash: Option<B256>,
/// Optional overlay source (lazy or immediate).
@@ -101,21 +99,16 @@ pub struct OverlayStateProviderFactory<F> {
changeset_cache: ChangesetCache,
/// Metrics for tracking provider operations
metrics: OverlayStateProviderMetrics,
/// A cache which maps `db_tip -> Overlay`. If the db tip changes during usage of the factory
/// then a new entry will get added to this, but in most cases only one entry is present.
overlay_cache: Arc<DashMap<BlockNumber, Overlay>>,
}
impl<F> OverlayStateProviderFactory<F> {
/// Create a new overlay state provider factory
pub fn new(factory: F, changeset_cache: ChangesetCache) -> Self {
impl OverlayBuilder {
/// Create a new overlay builder.
pub fn new(changeset_cache: ChangesetCache) -> Self {
Self {
factory,
block_hash: None,
overlay_source: None,
changeset_cache,
metrics: OverlayStateProviderMetrics::default(),
overlay_cache: Default::default(),
}
}
@@ -131,8 +124,6 @@ impl<F> OverlayStateProviderFactory<F> {
/// This overlay will be applied on top of any reverts applied via `with_block_hash`.
pub fn with_overlay_source(mut self, source: Option<OverlaySource>) -> Self {
self.overlay_source = source;
// Clear the overlay cache since we've updated the source.
self.overlay_cache = Default::default();
self
}
@@ -141,8 +132,6 @@ impl<F> OverlayStateProviderFactory<F> {
/// Convenience method that wraps the lazy overlay in `OverlaySource::Lazy`.
pub fn with_lazy_overlay(mut self, lazy_overlay: Option<LazyOverlay>) -> Self {
self.overlay_source = lazy_overlay.map(OverlaySource::Lazy);
// Clear the overlay cache since we've updated the source.
self.overlay_cache = Default::default();
self
}
@@ -158,8 +147,6 @@ impl<F> OverlayStateProviderFactory<F> {
trie: Arc::new(TrieUpdatesSorted::default()),
state,
});
// Clear the overlay cache since we've updated the source.
self.overlay_cache = Default::default();
}
self
}
@@ -186,23 +173,9 @@ impl<F> OverlayStateProviderFactory<F> {
});
}
}
// Clear the overlay cache since we've updated the source.
self.overlay_cache = Default::default();
self
}
}
impl<F> OverlayStateProviderFactory<F>
where
F: DatabaseProviderFactory,
F::Provider: StageCheckpointReader
+ PruneCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader
+ DBProvider
+ BlockNumReader
+ StorageSettingsCache,
{
/// Resolves the effective overlay (trie updates, hashed state).
///
/// If an overlay source is set, it is resolved (blocking if lazy).
@@ -217,10 +190,13 @@ where
}
/// Returns the block number for [`Self`]'s `block_hash` field, if any.
fn get_requested_block_number(
fn get_requested_block_number<Provider>(
&self,
provider: &F::Provider,
) -> ProviderResult<Option<BlockNumber>> {
provider: &Provider,
) -> ProviderResult<Option<BlockNumber>>
where
Provider: BlockNumReader,
{
if let Some(block_hash) = self.block_hash {
Ok(Some(
provider
@@ -234,7 +210,10 @@ where
/// Returns the block which is at the tip of the DB, i.e. the block which the state tables of
/// the DB are currently synced to.
fn get_db_tip_block_number(&self, provider: &F::Provider) -> ProviderResult<BlockNumber> {
fn get_db_tip_block_number<Provider>(&self, provider: &Provider) -> ProviderResult<BlockNumber>
where
Provider: StageCheckpointReader,
{
provider
.get_stage_checkpoint(StageId::Finish)?
.as_ref()
@@ -247,12 +226,15 @@ where
///
/// Takes into account both the stage checkpoint and the prune checkpoint to determine the
/// available data range.
fn reverts_required(
fn reverts_required<Provider>(
&self,
provider: &F::Provider,
provider: &Provider,
db_tip_block: BlockNumber,
requested_block: BlockNumber,
) -> ProviderResult<bool> {
) -> ProviderResult<bool>
where
Provider: PruneCheckpointReader,
{
// If the requested block is the DB tip then there won't be any reverts necessary, and we
// can simply return Ok.
if db_tip_block == requested_block {
@@ -288,11 +270,20 @@ where
skip_all,
fields(%db_tip_block)
)]
fn calculate_overlay(
fn calculate_overlay<Provider>(
&self,
provider: &F::Provider,
provider: &Provider,
db_tip_block: BlockNumber,
) -> ProviderResult<Overlay> {
) -> ProviderResult<Overlay>
where
Provider: ChangeSetReader
+ StorageChangeSetReader
+ DBProvider
+ BlockNumReader
+ StageCheckpointReader
+ PruneCheckpointReader
+ StorageSettingsCache,
{
//
// Set up variables we'll use for recording metrics. There's two different code-paths here,
// and we want to make sure both record metrics, so we do metrics recording after.
@@ -404,23 +395,74 @@ where
Ok(Overlay { trie_updates, hashed_post_state })
}
/// Fetches an [`Overlay`] from the cache based on the current db tip block. If there is no
/// cached value then this calculates the [`Overlay`] and populates the cache.
/// Builds the effective overlay for the given provider.
#[instrument(level = "debug", target = "providers::state::overlay", skip_all)]
fn get_overlay(&self, provider: &F::Provider) -> ProviderResult<Overlay> {
// No anchor block — just resolve the in-memory overlay directly.
pub(super) fn build_overlay<Provider>(&self, provider: &Provider) -> ProviderResult<Overlay>
where
Provider: StageCheckpointReader
+ PruneCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader
+ DBProvider
+ BlockNumReader
+ StorageSettingsCache,
{
if self.block_hash.is_none() {
let (trie_updates, hashed_post_state) = self.resolve_overlays();
return Ok(Overlay { trie_updates, hashed_post_state })
}
let db_tip_block = self.get_db_tip_block_number(provider)?;
self.calculate_overlay(provider, db_tip_block)
}
}
/// Factory for creating overlay state providers with optional reverts and overlays.
///
/// This factory allows building an `OverlayStateProvider` whose DB state has been reverted to a
/// particular block, and/or with additional overlay information added on top.
#[derive(Debug, Clone)]
pub struct OverlayStateProviderFactory<F> {
/// The underlying database provider factory
factory: F,
/// Overlay builder containing the configuration and overlay calculation logic.
overlay_builder: OverlayBuilder,
/// A cache which maps `db_tip -> Overlay`. If the db tip changes during usage of the factory
/// then a new entry will get added to this, but in most cases only one entry is present.
overlay_cache: Arc<DashMap<BlockNumber, Overlay>>,
}
impl<F> OverlayStateProviderFactory<F> {
/// Create a new overlay state provider factory
pub fn new(factory: F, overlay_builder: OverlayBuilder) -> Self {
Self { factory, overlay_builder, overlay_cache: Default::default() }
}
/// Fetches an [`Overlay`] from the cache based on the current db tip block. If there is no
/// cached value then this calculates the [`Overlay`] and populates the cache.
#[instrument(level = "debug", target = "providers::state::overlay", skip_all)]
fn get_overlay<Provider>(&self, provider: &Provider) -> ProviderResult<Overlay>
where
Provider: StageCheckpointReader
+ PruneCheckpointReader
+ ChangeSetReader
+ StorageChangeSetReader
+ DBProvider
+ BlockNumReader
+ StorageSettingsCache,
{
// No anchor block — just resolve the in-memory overlay directly.
if self.overlay_builder.block_hash.is_none() {
return self.overlay_builder.build_overlay(provider)
}
let db_tip_block = self.overlay_builder.get_db_tip_block_number(provider)?;
let overlay = match self.overlay_cache.entry(db_tip_block) {
dashmap::Entry::Occupied(entry) => entry.get().clone(),
dashmap::Entry::Vacant(entry) => {
self.metrics.overlay_cache_misses.increment(1);
let overlay = self.calculate_overlay(provider, db_tip_block)?;
self.overlay_builder.metrics.overlay_cache_misses.increment(1);
let overlay = self.overlay_builder.build_overlay(provider)?;
entry.insert(overlay.clone());
overlay
}
@@ -451,14 +493,14 @@ where
let provider = {
let start = Instant::now();
let res = self.factory.database_provider_ro()?;
self.metrics.create_provider_duration.record(start.elapsed());
self.overlay_builder.metrics.create_provider_duration.record(start.elapsed());
res
};
let Overlay { trie_updates, hashed_post_state } = self.get_overlay(&provider)?;
let is_v2 = provider.cached_storage_settings().is_v2();
self.metrics.database_provider_ro_duration.record(overall_start.elapsed());
self.overlay_builder.metrics.database_provider_ro_duration.record(overall_start.elapsed());
Ok(OverlayStateProvider::new(provider, trie_updates, hashed_post_state, is_v2))
}
}

View File

@@ -17,7 +17,7 @@ use reth_db::static_file::{
use reth_db_api::table::{Decompress, Value};
use reth_node_types::NodePrimitives;
use reth_primitives_traits::{SealedHeader, SignedTransaction};
use reth_static_file_types::{ChangesetOffset, ChangesetOffsetReader};
use reth_static_file_types::ChangesetOffset;
use reth_storage_api::range_size_hint;
use reth_storage_errors::provider::{ProviderError, ProviderResult};
use std::{
@@ -111,15 +111,11 @@ impl<'a, N: NodePrimitives> StaticFileJarProvider<'a, N> {
return Ok(None);
};
let csoff_path = self.data_path().with_extension("csoff");
if !csoff_path.exists() {
return Ok(None);
if let Some(reader) = self.jar.value().csoff_reader() {
reader.get(index).map_err(ProviderError::other)
} else {
Ok(None)
}
let len = header.changeset_offsets_len();
let mut reader =
ChangesetOffsetReader::new(&csoff_path, len).map_err(ProviderError::other)?;
reader.get(index).map_err(ProviderError::other)
}
/// Reads all changeset offsets from the sidecar file.
@@ -138,15 +134,12 @@ impl<'a, N: NodePrimitives> StaticFileJarProvider<'a, N> {
return Ok(Some(Vec::new()));
}
let csoff_path = self.data_path().with_extension("csoff");
if !csoff_path.exists() {
return Ok(None);
if let Some(reader) = self.jar.value().csoff_reader() {
let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
Ok(Some(offsets))
} else {
Ok(None)
}
let mut reader =
ChangesetOffsetReader::new(&csoff_path, len).map_err(ProviderError::other)?;
let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
Ok(Some(offsets))
}
}

View File

@@ -16,9 +16,9 @@ mod metrics;
mod writer_tests;
use reth_nippy_jar::NippyJar;
use reth_static_file_types::{SegmentHeader, StaticFileSegment};
use reth_static_file_types::{ChangesetOffsetReader, SegmentHeader, StaticFileSegment};
use reth_storage_errors::provider::{ProviderError, ProviderResult};
use std::{ops::Deref, sync::Arc};
use std::{io, ops::Deref, sync::Arc};
/// Alias type for each specific `NippyJar`.
type LoadedJarRef<'a> =
@@ -29,6 +29,7 @@ type LoadedJarRef<'a> =
pub struct LoadedJar {
jar: NippyJar<SegmentHeader>,
mmap_handle: Arc<reth_nippy_jar::DataReader>,
csoff_reader: Option<ChangesetOffsetReader>,
}
impl LoadedJar {
@@ -36,7 +37,20 @@ impl LoadedJar {
match jar.open_data_reader() {
Ok(data_reader) => {
let mmap_handle = Arc::new(data_reader);
Ok(Self { jar, mmap_handle })
let csoff_reader = if jar.user_header().segment().is_change_based() {
let csoff_path = jar.data_path().with_extension("csoff");
let len = jar.user_header().changeset_offsets_len();
match ChangesetOffsetReader::new(&csoff_path, len) {
Ok(reader) => Some(reader),
Err(err) if err.kind() == io::ErrorKind::NotFound && len == 0 => None,
Err(err) => return Err(ProviderError::other(err)),
}
} else {
None
};
Ok(Self { jar, mmap_handle, csoff_reader })
}
Err(e) => Err(ProviderError::other(e)),
}
@@ -55,6 +69,11 @@ impl LoadedJar {
fn size(&self) -> usize {
self.mmap_handle.size() + self.mmap_handle.offsets_size()
}
/// Returns a reference to the cached changeset offset reader.
const fn csoff_reader(&self) -> Option<&ChangesetOffsetReader> {
self.csoff_reader.as_ref()
}
}
impl Deref for LoadedJar {

View File

@@ -400,7 +400,7 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
// Step 2: Validate sidecar offsets against actual NippyJar state
let valid_blocks = if actual_sidecar_blocks > 0 {
let mut reader = ChangesetOffsetReader::new(&csoff_path, actual_sidecar_blocks)
let reader = ChangesetOffsetReader::new(&csoff_path, actual_sidecar_blocks)
.map_err(ProviderError::other)?;
// Find last block where offset + num_changes <= actual_nippy_rows
@@ -896,7 +896,7 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
// Read offset for the block after last_block from sidecar.
// Use committed length from header, ignoring any uncommitted records
// that may exist in the file after a crash.
let mut reader = ChangesetOffsetReader::new(&csoff_path, changeset_offsets_len)
let reader = ChangesetOffsetReader::new(&csoff_path, changeset_offsets_len)
.map_err(ProviderError::other)?;
if let Some(next_offset) = reader.get(blocks_to_keep).map_err(ProviderError::other)? {
next_offset.offset()

View File

@@ -614,7 +614,7 @@ mod tests {
assert_eq!(get_header_block_count(&provider, 0), 5);
// Verify offsets are correct
let mut reader = ChangesetOffsetReader::new(&sidecar_path, 5).unwrap();
let reader = ChangesetOffsetReader::new(&sidecar_path, 5).unwrap();
let o0 = reader.get(0).unwrap().unwrap();
assert_eq!(o0.offset(), 0);

View File

@@ -22,12 +22,70 @@ pub trait BalStore: Send + Sync + 'static {
/// The returned vector must align with `block_hashes`.
fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> ProviderResult<Vec<Option<Bytes>>>;
/// Fetch BAL response entries for the given block hashes, stopping after the soft limit is
/// exceeded.
///
/// Entries are returned in request order. Unavailable BALs are represented as an RLP-encoded
/// empty list (`0xc0`). The limit is soft: the entry that exceeds the limit is included.
fn get_by_hashes_with_limit(
&self,
block_hashes: &[BlockHash],
limit: GetBlockAccessListLimit,
) -> ProviderResult<Vec<Bytes>> {
let mut out = Vec::new();
self.append_by_hashes_with_limit(block_hashes, limit, &mut out)?;
out.shrink_to_fit();
Ok(out)
}
/// Extends the given vector with BAL response entries for the given hashes.
///
/// This adheres to the expected behavior of [`Self::get_by_hashes_with_limit`].
fn append_by_hashes_with_limit(
&self,
block_hashes: &[BlockHash],
limit: GetBlockAccessListLimit,
out: &mut Vec<Bytes>,
) -> ProviderResult<()> {
let mut size = 0;
for bal in self.get_by_hashes(block_hashes)? {
let bal = bal.unwrap_or_else(|| Bytes::from_static(&[0xc0]));
size += bal.len();
out.push(bal);
if limit.exceeds(size) {
break
}
}
Ok(())
}
/// Fetch BALs for the requested range.
///
/// Implementations may stop at the first gap and return the contiguous prefix.
fn get_by_range(&self, start: BlockNumber, count: u64) -> ProviderResult<Vec<Bytes>>;
}
/// The limit to enforce for [`BalStore::get_by_hashes_with_limit`].
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum GetBlockAccessListLimit {
/// No limit, return all BALs.
None,
/// Enforce a size limit on the returned BALs, for example 2MB.
ResponseSizeSoftLimit(usize),
}
impl GetBlockAccessListLimit {
/// Returns true if the given size exceeds the limit.
#[inline]
pub const fn exceeds(&self, size: usize) -> bool {
match self {
Self::None => false,
Self::ResponseSizeSoftLimit(limit) => size > *limit,
}
}
}
/// Clone-friendly façade around a BAL store implementation.
#[derive(Clone)]
pub struct BalStoreHandle {
@@ -62,6 +120,28 @@ impl BalStoreHandle {
self.inner.get_by_hashes(block_hashes)
}
/// Fetch BAL response entries for the given block hashes, stopping after the soft limit is
/// exceeded.
#[inline]
pub fn get_by_hashes_with_limit(
&self,
block_hashes: &[BlockHash],
limit: GetBlockAccessListLimit,
) -> ProviderResult<Vec<Bytes>> {
self.inner.get_by_hashes_with_limit(block_hashes, limit)
}
/// Extends the given vector with BAL response entries for the given hashes.
#[inline]
pub fn append_by_hashes_with_limit(
&self,
block_hashes: &[BlockHash],
limit: GetBlockAccessListLimit,
out: &mut Vec<Bytes>,
) -> ProviderResult<()> {
self.inner.append_by_hashes_with_limit(block_hashes, limit, out)
}
/// Fetch BALs for the requested range.
#[inline]
pub fn get_by_range(&self, start: BlockNumber, count: u64) -> ProviderResult<Vec<Bytes>> {
@@ -106,6 +186,25 @@ impl BalStore for NoopBalStore {
Ok(block_hashes.iter().map(|_| None).collect())
}
fn append_by_hashes_with_limit(
&self,
block_hashes: &[BlockHash],
limit: GetBlockAccessListLimit,
out: &mut Vec<Bytes>,
) -> ProviderResult<()> {
let mut size = 0;
for _ in block_hashes {
let bal = Bytes::from_static(&[0xc0]);
size += bal.len();
out.push(bal);
if limit.exceeds(size) {
break
}
}
Ok(())
}
fn get_by_range(&self, _start: BlockNumber, _count: u64) -> ProviderResult<Vec<Bytes>> {
Ok(Vec::new())
}
@@ -127,4 +226,27 @@ mod tests {
assert_eq!(by_hash, vec![None, None]);
assert!(by_range.is_empty());
}
#[test]
fn noop_store_limited_lookup_returns_prefix() {
let store = BalStoreHandle::default();
let hashes = [B256::random(), B256::random(), B256::random()];
let limited = store
.get_by_hashes_with_limit(&hashes, GetBlockAccessListLimit::ResponseSizeSoftLimit(1))
.unwrap();
assert_eq!(limited, vec![Bytes::from_static(&[0xc0]), Bytes::from_static(&[0xc0])]);
}
#[test]
fn block_access_list_limit() {
let limit_none = GetBlockAccessListLimit::None;
assert!(!limit_none.exceeds(usize::MAX));
let size_limit_2mb = GetBlockAccessListLimit::ResponseSizeSoftLimit(2 * 1024 * 1024);
assert!(!size_limit_2mb.exceeds(1024 * 1024));
assert!(!size_limit_2mb.exceeds(2 * 1024 * 1024));
assert!(size_limit_2mb.exceeds(3 * 1024 * 1024));
}
}

View File

@@ -547,7 +547,7 @@ where
}
}
ensure_intrinsic_gas(transaction, &self.fork_tracker)?;
ensure_intrinsic_gas(transaction, &self.fork_tracker, block_gas_limit)?;
// light blob tx pre-checks
if transaction.is_eip4844() {
@@ -1404,6 +1404,7 @@ impl ForkTracker {
pub fn ensure_intrinsic_gas<T: EthPoolTransaction>(
transaction: &T,
fork_tracker: &ForkTracker,
block_gas_limit: u64,
) -> Result<(), InvalidPoolTransactionError> {
use revm_primitives::hardfork::SpecId;
let spec_id = if fork_tracker.is_prague_activated() {
@@ -1424,6 +1425,7 @@ pub fn ensure_intrinsic_gas<T: EthPoolTransaction>(
.map(|l| l.iter().map(|i| i.storage_keys.len()).sum::<usize>())
.unwrap_or_default() as u64,
transaction.authorization_list().map(|l| l.len()).unwrap_or_default() as u64,
revm_primitives::eip8037::cost_per_state_byte(block_gas_limit),
);
let gas_limit = transaction.gas_limit();
@@ -1478,11 +1480,11 @@ mod tests {
tx_gas_limit_cap: AtomicU64::new(0),
};
let res = ensure_intrinsic_gas(&transaction, &fork_tracker);
let res = ensure_intrinsic_gas(&transaction, &fork_tracker, 30_000_000);
assert!(res.is_ok());
fork_tracker.shanghai = true.into();
let res = ensure_intrinsic_gas(&transaction, &fork_tracker);
let res = ensure_intrinsic_gas(&transaction, &fork_tracker, 30_000_000);
assert!(res.is_ok());
let provider = MockEthProvider::default().with_genesis_block();

View File

@@ -1164,7 +1164,7 @@ mod tests {
let changeset_cache = reth_trie_db::ChangesetCache::new();
let factory = reth_provider::providers::OverlayStateProviderFactory::new(
provider_factory,
changeset_cache,
reth_provider::providers::OverlayBuilder::new(changeset_cache),
);
let ctx = test_ctx(factory);

View File

@@ -283,9 +283,10 @@ mod tests {
async fn random_parallel_root() {
let factory = create_test_provider_factory();
let changeset_cache = reth_trie_db::ChangesetCache::new();
let overlay_builder = reth_provider::providers::OverlayBuilder::new(changeset_cache);
let mut overlay_factory = reth_provider::providers::OverlayStateProviderFactory::new(
factory.clone(),
changeset_cache,
overlay_builder.clone(),
);
let mut rng = rand::rng();
@@ -362,8 +363,10 @@ mod tests {
}
let prefix_sets = hashed_state.construct_prefix_sets();
overlay_factory =
overlay_factory.with_hashed_state_overlay(Some(Arc::new(hashed_state.into_sorted())));
overlay_factory = reth_provider::providers::OverlayStateProviderFactory::new(
factory,
overlay_builder.with_hashed_state_overlay(Some(Arc::new(hashed_state.into_sorted()))),
);
assert_eq!(
ParallelStateRoot::new(overlay_factory, prefix_sets.freeze(), runtime)

View File

@@ -113,6 +113,11 @@ Database:
--db.sync-mode <SYNC_MODE>
Controls how aggressively the database synchronizes data to disk
--db.rocksdb-block-cache-size <ROCKSDB_BLOCK_CACHE_SIZE>
`RocksDB` block cache size (e.g., 512MB, 4GB).
Controls the size of the in-memory LRU cache for decompressed `RocksDB` blocks. A larger cache reduces repeated decompression of hot blocks, improving read performance for history lookups.
Static Files:
--static-files.blocks-per-file.headers <BLOCKS_PER_FILE_HEADERS>
Number of blocks per file for the headers segment

View File

@@ -62,6 +62,11 @@ Database:
--db.sync-mode <SYNC_MODE>
Controls how aggressively the database synchronizes data to disk
--db.rocksdb-block-cache-size <ROCKSDB_BLOCK_CACHE_SIZE>
`RocksDB` block cache size (e.g., 512MB, 4GB).
Controls the size of the in-memory LRU cache for decompressed `RocksDB` blocks. A larger cache reduces repeated decompression of hot blocks, improving read performance for history lookups.
--table <TABLE>
The table name to diff. If not specified, all tables are diffed.

View File

@@ -92,6 +92,11 @@ Database:
--db.sync-mode <SYNC_MODE>
Controls how aggressively the database synchronizes data to disk
--db.rocksdb-block-cache-size <ROCKSDB_BLOCK_CACHE_SIZE>
`RocksDB` block cache size (e.g., 512MB, 4GB).
Controls the size of the in-memory LRU cache for decompressed `RocksDB` blocks. A larger cache reduces repeated decompression of hot blocks, improving read performance for history lookups.
Static Files:
--static-files.blocks-per-file.headers <BLOCKS_PER_FILE_HEADERS>
Number of blocks per file for the headers segment

View File

@@ -92,6 +92,11 @@ Database:
--db.sync-mode <SYNC_MODE>
Controls how aggressively the database synchronizes data to disk
--db.rocksdb-block-cache-size <ROCKSDB_BLOCK_CACHE_SIZE>
`RocksDB` block cache size (e.g., 512MB, 4GB).
Controls the size of the in-memory LRU cache for decompressed `RocksDB` blocks. A larger cache reduces repeated decompression of hot blocks, improving read performance for history lookups.
Static Files:
--static-files.blocks-per-file.headers <BLOCKS_PER_FILE_HEADERS>
Number of blocks per file for the headers segment

View File

@@ -92,6 +92,11 @@ Database:
--db.sync-mode <SYNC_MODE>
Controls how aggressively the database synchronizes data to disk
--db.rocksdb-block-cache-size <ROCKSDB_BLOCK_CACHE_SIZE>
`RocksDB` block cache size (e.g., 512MB, 4GB).
Controls the size of the in-memory LRU cache for decompressed `RocksDB` blocks. A larger cache reduces repeated decompression of hot blocks, improving read performance for history lookups.
Static Files:
--static-files.blocks-per-file.headers <BLOCKS_PER_FILE_HEADERS>
Number of blocks per file for the headers segment

View File

@@ -92,6 +92,11 @@ Database:
--db.sync-mode <SYNC_MODE>
Controls how aggressively the database synchronizes data to disk
--db.rocksdb-block-cache-size <ROCKSDB_BLOCK_CACHE_SIZE>
`RocksDB` block cache size (e.g., 512MB, 4GB).
Controls the size of the in-memory LRU cache for decompressed `RocksDB` blocks. A larger cache reduces repeated decompression of hot blocks, improving read performance for history lookups.
Static Files:
--static-files.blocks-per-file.headers <BLOCKS_PER_FILE_HEADERS>
Number of blocks per file for the headers segment

View File

@@ -92,6 +92,11 @@ Database:
--db.sync-mode <SYNC_MODE>
Controls how aggressively the database synchronizes data to disk
--db.rocksdb-block-cache-size <ROCKSDB_BLOCK_CACHE_SIZE>
`RocksDB` block cache size (e.g., 512MB, 4GB).
Controls the size of the in-memory LRU cache for decompressed `RocksDB` blocks. A larger cache reduces repeated decompression of hot blocks, improving read performance for history lookups.
Static Files:
--static-files.blocks-per-file.headers <BLOCKS_PER_FILE_HEADERS>
Number of blocks per file for the headers segment

View File

@@ -92,6 +92,11 @@ Database:
--db.sync-mode <SYNC_MODE>
Controls how aggressively the database synchronizes data to disk
--db.rocksdb-block-cache-size <ROCKSDB_BLOCK_CACHE_SIZE>
`RocksDB` block cache size (e.g., 512MB, 4GB).
Controls the size of the in-memory LRU cache for decompressed `RocksDB` blocks. A larger cache reduces repeated decompression of hot blocks, improving read performance for history lookups.
Static Files:
--static-files.blocks-per-file.headers <BLOCKS_PER_FILE_HEADERS>
Number of blocks per file for the headers segment

View File

@@ -87,8 +87,8 @@ Networking:
--disable-discv4-discovery
Disable Discv4 discovery
--enable-discv5-discovery
Enable Discv5 discovery
--disable-discv5-discovery
Disable Discv5 discovery
--disable-nat
Disable Nat discovery
@@ -821,6 +821,11 @@ Database:
--db.sync-mode <SYNC_MODE>
Controls how aggressively the database synchronizes data to disk
--db.rocksdb-block-cache-size <ROCKSDB_BLOCK_CACHE_SIZE>
`RocksDB` block cache size (e.g., 512MB, 4GB).
Controls the size of the in-memory LRU cache for decompressed `RocksDB` blocks. A larger cache reduces repeated decompression of hot blocks, improving read performance for history lookups.
Dev testnet:
--dev
Start the node in dev mode
@@ -950,6 +955,13 @@ Engine:
[default: 0]
--engine.invalid-header-cache-hit-eviction-threshold <INVALID_HEADER_HIT_EVICTION_THRESHOLD>
Configure how many cache hits an invalid header can accumulate before it is evicted and reprocessed.
Set to `0` to effectively disable the cache because entries are evicted on the first lookup.
[default: 128]
--engine.legacy-state-root
Enable legacy state root

View File

@@ -27,8 +27,8 @@ Networking:
--disable-discv4-discovery
Disable Discv4 discovery
--enable-discv5-discovery
Enable Discv5 discovery
--disable-discv5-discovery
Disable Discv5 discovery
--disable-nat
Disable Nat discovery

View File

@@ -27,8 +27,8 @@ Networking:
--disable-discv4-discovery
Disable Discv4 discovery
--enable-discv5-discovery
Enable Discv5 discovery
--disable-discv5-discovery
Disable Discv5 discovery
--disable-nat
Disable Nat discovery

Some files were not shown because too many files have changed in this diff Show More