Compare commits

..

1 Commits

Author SHA1 Message Date
Sergei Shulepov
c825c7f7a4 perf(trie): make update_leaves parallelism threshold batch-scoped
Previously min_updates was checked per-subtrie, so small subtries were
always updated inline even when the overall batch was large enough to
benefit from parallelism. This changes the threshold to a batch-level
check so that once the batch is large enough, all subtries are deferred
for parallel processing.

Amp-Thread-ID: https://ampcode.com/threads/T-019ce331-57d8-74b3-9fa3-b58a3345623d
Co-authored-by: Amp <amp@ampcode.com>
2026-03-12 18:12:01 +00:00
85 changed files with 2251 additions and 1661 deletions

View File

@@ -1,5 +0,0 @@
---
reth-trie-sparse: patch
---
Refactored arena trie internals by adding a `BranchChildIdx::sibling()` helper, deduplicating `Index`/`NodeArena` type aliases, and replacing `is_empty()` with a `drop_root()` method. Fixed a bug where `cursor.pop()` was called before checking if the leaf was the root node, which could cause incorrect dirty-state propagation.

View File

@@ -5,4 +5,3 @@ self-hosted-runner:
- depot-ubuntu-latest-4
- depot-ubuntu-latest-8
- depot-ubuntu-latest-16
- available

View File

@@ -11,8 +11,6 @@
# BENCH_WAIT_TIME (duration like 500ms, default empty)
# BENCH_BASELINE_ARGS (extra reth node args for baseline runs)
# BENCH_FEATURE_ARGS (extra reth node args for feature runs)
# BENCH_OTLP_TRACES_ENDPOINT (OTLP HTTP endpoint for traces, e.g. https://host/insert/opentelemetry/v1/traces)
# BENCH_OTLP_LOGS_ENDPOINT (OTLP HTTP endpoint for logs, e.g. https://host/insert/opentelemetry/v1/logs)
set -euo pipefail
LABEL="$1"
@@ -141,14 +139,6 @@ if [ -n "${BENCH_METRICS_ADDR:-}" ]; then
RETH_ARGS+=(--metrics "$BENCH_METRICS_ADDR")
fi
# OTLP traces and logs export
if [ -n "${BENCH_OTLP_TRACES_ENDPOINT:-}" ]; then
RETH_ARGS+=(--tracing-otlp="${BENCH_OTLP_TRACES_ENDPOINT}" --tracing-otlp.service-name=reth-bench)
fi
if [ -n "${BENCH_OTLP_LOGS_ENDPOINT:-}" ]; then
RETH_ARGS+=(--logs-otlp="${BENCH_OTLP_LOGS_ENDPOINT}" --logs-otlp.filter=debug)
fi
# Tracy profiling: add --log.tracy flags and set environment
if [ "${BENCH_TRACY:-off}" != "off" ]; then
RETH_ARGS+=(--log.tracy --log.tracy.filter "${BENCH_TRACY_FILTER:-debug}")
@@ -159,29 +149,16 @@ if [ "${BENCH_TRACY:-off}" != "off" ]; then
fi
fi
SUDO_ENV=()
if [ -n "${OTEL_RESOURCE_ATTRIBUTES:-}" ]; then
SUDO_ENV+=("OTEL_RESOURCE_ATTRIBUTES=${OTEL_RESOURCE_ATTRIBUTES}")
SUDO_ENV+=("OTEL_BSP_MAX_QUEUE_SIZE=65536" "OTEL_BLRP_MAX_QUEUE_SIZE=65536")
fi
# Limit reth memory to 95% of available RAM to prevent OOM kills
TOTAL_MEM_KB=$(awk '/^MemTotal:/ {print $2}' /proc/meminfo)
MEM_LIMIT=$(( TOTAL_MEM_KB * 95 / 100 * 1024 ))
echo "Memory limit: $(( MEM_LIMIT / 1024 / 1024 ))MB (95% of $(( TOTAL_MEM_KB / 1024 ))MB)"
if [ "${BENCH_SAMPLY:-false}" = "true" ]; then
RETH_ARGS+=(--log.samply)
SAMPLY="$(which samply)"
sudo systemd-run --scope -p MemoryMax="$MEM_LIMIT" -p AllowedCPUs="$RETH_CPUS" \
env "${SUDO_ENV[@]}" nice -n -20 \
sudo taskset -c "$RETH_CPUS" nice -n -20 \
"$SAMPLY" record --save-only --presymbolicate --rate 10000 \
--output "$OUTPUT_DIR/samply-profile.json.gz" \
-- "$BINARY" "${RETH_ARGS[@]}" \
> "$LOG" 2>&1 &
else
sudo systemd-run --scope -p MemoryMax="$MEM_LIMIT" -p AllowedCPUs="$RETH_CPUS" \
env "${SUDO_ENV[@]}" nice -n -20 "$BINARY" "${RETH_ARGS[@]}" \
sudo taskset -c "$RETH_CPUS" nice -n -20 "$BINARY" "${RETH_ARGS[@]}" \
> "$LOG" 2>&1 &
fi

View File

@@ -7,8 +7,6 @@
// BENCH_PR PR number (may be empty)
// BENCH_ACTOR GitHub user who triggered the bench
// BENCH_JOB_URL URL to the Actions job page
// BENCH_BASELINE_ARGS Extra CLI args for the baseline reth node
// BENCH_FEATURE_ARGS Extra CLI args for the feature reth node
// BENCH_SAMPLY 'true' if samply profiling was enabled
//
// Usage from actions/github-script:
@@ -134,13 +132,7 @@ function buildSuccessBlocks({ summary, prNumber, actor, actorSlackId, jobUrl, re
if (cores !== '0') countsParts.push(`*Cores:* ${cores}`);
const countsLine = countsParts.join(' | ');
const baselineArgs = process.env.BENCH_BASELINE_ARGS || '';
const featureArgs = process.env.BENCH_FEATURE_ARGS || '';
const argsLines = [];
if (baselineArgs) argsLines.push(`*Baseline Args:* \`${baselineArgs}\``);
if (featureArgs) argsLines.push(`*Feature Args:* \`${featureArgs}\``);
const sectionText = [metaParts.join(' | '), '', baselineLine, featureLine, ...argsLines, countsLine].join('\n');
const sectionText = [metaParts.join(' | '), '', baselineLine, featureLine, countsLine].join('\n');
// Action buttons
const diffUrl = `https://github.com/${repo}/compare/${summary.baseline.ref}...${summary.feature.ref}`;

View File

@@ -476,7 +476,7 @@ jobs:
reth-bench:
needs: reth-bench-ack
name: reth-bench
runs-on: [self-hosted, Linux, X64, available]
runs-on: [self-hosted, Linux, X64]
timeout-minutes: 120
env:
BENCH_RPC_URL: https://ethereum.reth.rs/rpc
@@ -497,8 +497,6 @@ jobs:
BENCH_COMMENT_ID: ${{ needs.reth-bench-ack.outputs.comment-id }}
BENCH_NO_SLACK: ${{ needs.reth-bench-ack.outputs.no-slack }}
BENCH_METRICS_ADDR: "127.0.0.1:9100"
BENCH_OTLP_TRACES_ENDPOINT: ${{ secrets.BENCH_OTLP_TRACES_ENDPOINT }}
BENCH_OTLP_LOGS_ENDPOINT: ${{ secrets.BENCH_OTLP_LOGS_ENDPOINT }}
steps:
- name: Clean up previous bench-work
run: sudo rm -rf "$BENCH_WORK_DIR" 2>/dev/null || true
@@ -880,7 +878,6 @@ jobs:
id: run-baseline-1
env:
BASELINE_REF: ${{ steps.refs.outputs.baseline-ref }}
OTEL_RESOURCE_ATTRIBUTES: "benchmark_id=${{ env.BENCH_ID }},benchmark_run=baseline-1,run_type=baseline,git_ref=${{ steps.refs.outputs.baseline-ref }}"
run: |
cat > "$BENCH_LABELS_FILE" <<LABELS
{"benchmark_run":"baseline-1","run_type":"baseline","git_ref":"${BASELINE_REF}","bench_sha":"${BASELINE_REF}","benchmark_id":"${BENCH_ID}","run_start_epoch":"$(date +%s)","reference_epoch":"${BENCH_REFERENCE_EPOCH}"}
@@ -891,7 +888,6 @@ jobs:
id: run-feature-1
env:
FEATURE_REF: ${{ steps.refs.outputs.feature-ref }}
OTEL_RESOURCE_ATTRIBUTES: "benchmark_id=${{ env.BENCH_ID }},benchmark_run=feature-1,run_type=feature,git_ref=${{ steps.refs.outputs.feature-ref }}"
run: |
cat > "$BENCH_LABELS_FILE" <<LABELS
{"benchmark_run":"feature-1","run_type":"feature","git_ref":"${FEATURE_REF}","bench_sha":"${FEATURE_REF}","benchmark_id":"${BENCH_ID}","run_start_epoch":"$(date +%s)","reference_epoch":"${BENCH_REFERENCE_EPOCH}"}
@@ -899,11 +895,9 @@ jobs:
taskset -c 0 .github/scripts/bench-reth-run.sh feature ../reth-feature/target/profiling/reth "$BENCH_WORK_DIR/feature-1"
- name: "Run benchmark: feature (2/2)"
if: env.BENCH_ABBA != 'false'
id: run-feature-2
env:
FEATURE_REF: ${{ steps.refs.outputs.feature-ref }}
OTEL_RESOURCE_ATTRIBUTES: "benchmark_id=${{ env.BENCH_ID }},benchmark_run=feature-2,run_type=feature,git_ref=${{ steps.refs.outputs.feature-ref }}"
run: |
cat > "$BENCH_LABELS_FILE" <<LABELS
{"benchmark_run":"feature-2","run_type":"feature","git_ref":"${FEATURE_REF}","bench_sha":"${FEATURE_REF}","benchmark_id":"${BENCH_ID}","run_start_epoch":"$(date +%s)","reference_epoch":"${BENCH_REFERENCE_EPOCH}"}
@@ -911,11 +905,9 @@ jobs:
taskset -c 0 .github/scripts/bench-reth-run.sh feature ../reth-feature/target/profiling/reth "$BENCH_WORK_DIR/feature-2"
- name: "Run benchmark: baseline (2/2)"
if: env.BENCH_ABBA != 'false'
id: run-baseline-2
env:
BASELINE_REF: ${{ steps.refs.outputs.baseline-ref }}
OTEL_RESOURCE_ATTRIBUTES: "benchmark_id=${{ env.BENCH_ID }},benchmark_run=baseline-2,run_type=baseline,git_ref=${{ steps.refs.outputs.baseline-ref }}"
run: |
LAST_RUN_START=$(date +%s)
echo "BENCH_LAST_RUN_START=${LAST_RUN_START}" >> "$GITHUB_ENV"

View File

@@ -6,7 +6,7 @@ on:
hive_target:
required: true
type: string
description: "Docker bake target to build (e.g. hive)"
description: "Docker bake target to build (e.g. hive-stable, hive-edge)"
artifact_name:
required: false
type: string

View File

@@ -28,11 +28,6 @@ on:
required: false
type: boolean
default: false
pgo:
description: "Enable PGO profiling"
required: false
type: boolean
default: false
pgo_blocks:
description: "Number of blocks to execute for PGO profiling"
required: false
@@ -41,14 +36,14 @@ on:
jobs:
collect-pgo-profile:
if: github.repository == 'paradigmxyz/reth' && github.event_name == 'workflow_dispatch' && inputs.pgo
if: github.repository == 'paradigmxyz/reth'
uses: ./.github/workflows/pgo-profile.yml
with:
pgo_blocks: ${{ inputs.pgo_blocks || '20' }}
pgo_blocks: ${{ github.event_name == 'workflow_dispatch' && inputs.pgo_blocks || '20' }}
secrets: inherit
build:
if: github.repository == 'paradigmxyz/reth' && !failure() && !cancelled()
if: github.repository == 'paradigmxyz/reth'
name: Build Docker images
runs-on: ubuntu-24.04
needs: collect-pgo-profile
@@ -77,7 +72,6 @@ jobs:
echo "dirty=false" >> "$GITHUB_OUTPUT"
- name: Download pre-collected PGO profile
if: ${{ github.event_name == 'workflow_dispatch' && inputs.pgo }}
uses: actions/download-artifact@v7
with:
name: pgo-profdata
@@ -86,19 +80,13 @@ jobs:
- name: Configure PGO build args
id: pgo
run: |
if [[ "${{ github.event_name }}" == "workflow_dispatch" ]] && [[ "${{ inputs.pgo }}" == "true" ]]; then
if [ ! -f dist/merged.profdata ]; then
echo "::error::Expected dist/merged.profdata from collect-pgo-profile job"
exit 1
fi
echo "use_pgo_bolt=true" >> "$GITHUB_OUTPUT"
echo "pgo_profdata=dist/merged.profdata" >> "$GITHUB_OUTPUT"
echo "Using pre-collected PGO profile from collect-pgo-profile job"
else
echo "use_pgo_bolt=false" >> "$GITHUB_OUTPUT"
echo "pgo_profdata=" >> "$GITHUB_OUTPUT"
echo "PGO disabled"
if [ ! -f dist/merged.profdata ]; then
echo "::error::Expected dist/merged.profdata from collect-pgo-profile job"
exit 1
fi
echo "use_pgo_bolt=true" >> "$GITHUB_OUTPUT"
echo "pgo_profdata=dist/merged.profdata" >> "$GITHUB_OUTPUT"
echo "Using pre-collected PGO profile from collect-pgo-profile job"
- name: Determine build parameters
id: params

View File

@@ -63,6 +63,6 @@ jobs:
run: |
cargo nextest run \
--no-fail-fast \
--locked \
--locked --features "edge" \
-p reth-e2e-test-utils \
-E 'binary(rocksdb)'

View File

@@ -15,11 +15,18 @@ concurrency:
cancel-in-progress: true
jobs:
build-reth:
build-reth-stable:
uses: ./.github/workflows/docker-test.yml
with:
hive_target: hive
artifact_name: "reth"
hive_target: hive-stable
artifact_name: "reth-stable"
secrets: inherit
build-reth-edge:
uses: ./.github/workflows/docker-test.yml
with:
hive_target: hive-edge
artifact_name: "reth-edge"
secrets: inherit
prepare-hive:
@@ -77,6 +84,7 @@ jobs:
strategy:
fail-fast: false
matrix:
storage: [stable, edge]
# ethereum/rpc to be deprecated:
# https://github.com/ethereum/hive/pull/1117
scenario:
@@ -176,9 +184,10 @@ jobs:
- sim: ethereum/eels/consume-rlp
limit: .*tests/paris.*
needs:
- build-reth
- build-reth-stable
- build-reth-edge
- prepare-hive
name: ${{ matrix.scenario.sim }}${{ matrix.scenario.limit && format(' - {0}', matrix.scenario.limit) }}
name: ${{ matrix.storage }} / ${{ matrix.scenario.sim }}${{ matrix.scenario.limit && format(' - {0}', matrix.scenario.limit) }}
# Use larger runners for eels tests to avoid OOM runner crashes
runs-on: ${{ github.repository == 'paradigmxyz/reth' && (contains(matrix.scenario.sim, 'eels') && 'depot-ubuntu-latest-8' || 'depot-ubuntu-latest-4') || 'ubuntu-latest' }}
permissions:
@@ -197,7 +206,7 @@ jobs:
- name: Download reth image
uses: actions/download-artifact@v8
with:
name: reth
name: reth-${{ matrix.storage }}
path: /tmp
- name: Load Docker images

View File

@@ -22,7 +22,7 @@ concurrency:
jobs:
test:
name: test / ${{ matrix.network }}
name: test / ${{ matrix.network }} / ${{ matrix.storage }}
if: github.event_name != 'schedule'
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
env:
@@ -30,6 +30,7 @@ jobs:
strategy:
matrix:
network: ["ethereum"]
storage: ["stable", "edge"]
timeout-minutes: 60
steps:
- uses: actions/checkout@v6
@@ -46,7 +47,7 @@ jobs:
run: |
cargo nextest run \
--no-fail-fast \
--locked --features "asm-keccak ${{ matrix.network }}" \
--locked --features "asm-keccak ${{ matrix.network }} ${{ matrix.storage == 'edge' && 'edge' || '' }}" \
--workspace --exclude ef-tests \
-E "kind(test) and not binary(e2e_testsuite)"

View File

@@ -13,10 +13,6 @@ on:
description: "Enable dry run mode (builds artifacts but skips uploads and release creation)"
type: boolean
default: false
pgo:
description: "Enable PGO profiling"
type: boolean
default: false
pgo_blocks:
description: "Number of blocks to execute for PGO profiling on self-hosted runner"
type: string
@@ -158,14 +154,12 @@ jobs:
path: ${{ matrix.build.binary }}-${{ needs.extract-version.outputs.VERSION }}-${{ matrix.configs.target }}.tar.gz.asc
collect-pgo-profile:
if: github.event_name == 'workflow_dispatch' && inputs.pgo
uses: ./.github/workflows/pgo-profile.yml
with:
pgo_blocks: ${{ inputs.pgo_blocks || '20' }}
pgo_blocks: ${{ github.event_name == 'workflow_dispatch' && inputs.pgo_blocks || '20' }}
secrets: inherit
build-pgo:
if: github.event_name == 'workflow_dispatch' && inputs.pgo
name: build release (x86_64-linux PGO+BOLT)
runs-on: [self-hosted, Linux, X64]
needs: [extract-version, collect-pgo-profile]
@@ -243,7 +237,7 @@ jobs:
name: draft release
runs-on: ubuntu-latest
needs: [build, build-pgo, extract-version]
if: ${{ !failure() && !cancelled() && github.event.inputs.dry_run != 'true' }}
if: ${{ github.event.inputs.dry_run != 'true' }}
env:
VERSION: ${{ needs.extract-version.outputs.VERSION }}
permissions:

View File

@@ -19,13 +19,15 @@ concurrency:
jobs:
test:
name: test / ${{ matrix.type }}
name: test / ${{ matrix.type }} / ${{ matrix.storage }}
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
env:
RUST_BACKTRACE: 1
EDGE_FEATURES: ${{ matrix.storage == 'edge' && 'edge' || '' }}
strategy:
matrix:
type: [ethereum]
storage: [stable, edge]
include:
- type: ethereum
features: asm-keccak ethereum
@@ -48,14 +50,14 @@ jobs:
run: |
cargo nextest run \
--no-fail-fast \
--features "${{ matrix.features }}" --locked \
--features "${{ matrix.features }} $EDGE_FEATURES" --locked \
${{ matrix.exclude_args }} --workspace \
--exclude ef-tests --no-tests=warn \
-E "!kind(test) and not binary(e2e_testsuite)"
state:
name: Ethereum state tests
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-8' || 'ubuntu-latest' }}
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
env:
RUST_LOG: info,sync=error
RUST_BACKTRACE: 1

158
Cargo.lock generated
View File

@@ -1003,9 +1003,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299"
[[package]]
name = "anstream"
version = "1.0.0"
version = "0.6.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "824a212faf96e9acacdbd09febd34438f8f711fb84e09a8916013cd7815ca28d"
checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a"
dependencies = [
"anstyle",
"anstyle-parse",
@@ -1018,15 +1018,15 @@ dependencies = [
[[package]]
name = "anstyle"
version = "1.0.14"
version = "1.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000"
checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78"
[[package]]
name = "anstyle-parse"
version = "1.0.0"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52ce7f38b242319f7cabaa6813055467063ecdc9d355bbb4ce0c68908cd8130e"
checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2"
dependencies = [
"utf8parse",
]
@@ -2026,9 +2026,9 @@ dependencies = [
[[package]]
name = "c-kzg"
version = "2.1.7"
version = "2.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6648ed1e4ea8e8a1a4a2c78e1cda29a3fd500bc622899c340d8525ea9a76b24a"
checksum = "1a0f582957c24870b7bfd12bf562c40b4734b533cafbaf8ded31d6d85f462c01"
dependencies = [
"arbitrary",
"blst",
@@ -2113,9 +2113,9 @@ dependencies = [
[[package]]
name = "cc"
version = "1.2.57"
version = "1.2.56"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a0dd1ca384932ff3641c8718a02769f1698e7563dc6974ffd03346116310423"
checksum = "aebf35691d1bfb0ac386a69bac2fde4dd276fb618cf8bf4f5318fe285e821bb2"
dependencies = [
"find-msvc-tools",
"jobserver",
@@ -2214,9 +2214,9 @@ dependencies = [
[[package]]
name = "clap"
version = "4.6.0"
version = "4.5.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b193af5b67834b676abd72466a96c1024e6a6ad978a1f484bd90b85c94041351"
checksum = "2797f34da339ce31042b27d23607e051786132987f595b02ba4f6a6dffb7030a"
dependencies = [
"clap_builder",
"clap_derive",
@@ -2224,9 +2224,9 @@ dependencies = [
[[package]]
name = "clap_builder"
version = "4.6.0"
version = "4.5.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f"
checksum = "24a241312cea5059b13574bb9b3861cabf758b879c15190b37b6d6fd63ab6876"
dependencies = [
"anstream",
"anstyle",
@@ -2236,9 +2236,9 @@ dependencies = [
[[package]]
name = "clap_derive"
version = "4.6.0"
version = "4.5.55"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1110bd8a634a1ab8cb04345d8d878267d57c3cf1b38d91b71af6686408bbca6a"
checksum = "a92793da1a46a5f2a02a6f4c46c6496b28c43638adea8306fcb0caa1634f24e5"
dependencies = [
"heck",
"proc-macro2",
@@ -2248,9 +2248,9 @@ dependencies = [
[[package]]
name = "clap_lex"
version = "1.1.0"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9"
checksum = "3a822ea5bc7590f9d40f1ba12c0dc3c2760f3482c6984db1573ad11031420831"
[[package]]
name = "cmake"
@@ -2272,9 +2272,9 @@ dependencies = [
[[package]]
name = "codspeed"
version = "4.4.1"
version = "4.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b684e94583e85a5ca7e1a6454a89d76a5121240f2fb67eb564129d9bafdb9db0"
checksum = "38c2eb3388ebe26b5a0ab6bf4969d9c4840143d7f6df07caa3cc851b0606cef6"
dependencies = [
"anyhow",
"cc",
@@ -2282,7 +2282,7 @@ dependencies = [
"getrandom 0.2.17",
"glob",
"libc",
"nix",
"nix 0.30.1",
"serde",
"serde_json",
"statrs",
@@ -2290,9 +2290,9 @@ dependencies = [
[[package]]
name = "codspeed-criterion-compat"
version = "4.4.1"
version = "4.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e65444156eb73ad7f57618188f8d4a281726d133ef55b96d1dcff89528609ab"
checksum = "e1e270597a1d1e183f86d1cc9f94f0133654ee3daf201c17903ee29363555dd7"
dependencies = [
"clap",
"codspeed",
@@ -2303,9 +2303,9 @@ dependencies = [
[[package]]
name = "codspeed-criterion-compat-walltime"
version = "4.4.1"
version = "4.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96389aaa4bbb872ea4924dc0335b2bb181bcf28d6eedbe8fea29afcc5bde36a6"
checksum = "e6c2613d2fac930fe34456be76f9124ee0800bb9db2e7fd2d6c65b9ebe98a292"
dependencies = [
"anes",
"cast",
@@ -2408,9 +2408,9 @@ dependencies = [
[[package]]
name = "colorchoice"
version = "1.0.5"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570"
checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
[[package]]
name = "colored"
@@ -2782,7 +2782,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0b1fab2ae45819af2d0731d60f2afe17227ebb1a1538a236da84c93e9a60162"
dependencies = [
"dispatch2",
"nix",
"nix 0.31.2",
"windows-sys 0.61.2",
]
@@ -3023,9 +3023,9 @@ dependencies = [
[[package]]
name = "derive-where"
version = "1.6.1"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d08b3a0bcc0d079199cd476b2cae8435016ec11d1c0986c6901c5ac223041534"
checksum = "ef941ded77d15ca19b40374869ac6000af1c9f2a4c0f3d4c70926287e6364a8f"
dependencies = [
"proc-macro2",
"quote",
@@ -5500,9 +5500,9 @@ dependencies = [
[[package]]
name = "kasuari"
version = "0.4.12"
version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bde5057d6143cc94e861d90f591b9303d6716c6b9602309150bd068853c10899"
checksum = "8fe90c1150662e858c7d5f945089b7517b0a80d8bf7ba4b1b5ffc984e7230a5b"
dependencies = [
"hashbrown 0.16.1",
"portable-atomic",
@@ -5654,9 +5654,9 @@ dependencies = [
[[package]]
name = "libz-sys"
version = "1.1.25"
version = "1.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d52f4c29e2a68ac30c9087e1b772dc9f44a2b66ed44edf2266cf2be9b03dafc1"
checksum = "4735e9cbde5aac84a5ce588f6b23a90b9b0b528f6c5a8db8a4aff300463a0839"
dependencies = [
"cc",
"libc",
@@ -5733,7 +5733,7 @@ dependencies = [
"generator",
"scoped-tls",
"tracing",
"tracing-subscriber 0.3.23",
"tracing-subscriber 0.3.22",
]
[[package]]
@@ -5781,9 +5781,9 @@ dependencies = [
[[package]]
name = "lz4_flex"
version = "0.12.1"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "98c23545df7ecf1b16c303910a69b079e8e251d60f7dd2cc9b4177f2afaf1746"
checksum = "ab6473172471198271ff72e9379150e9dfd70d8e533e0752a27e515b48dd375e"
[[package]]
name = "mach2"
@@ -6086,6 +6086,18 @@ dependencies = [
"unsigned-varint",
]
[[package]]
name = "nix"
version = "0.30.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6"
dependencies = [
"bitflags 2.11.0",
"cfg-if",
"cfg_aliases",
"libc",
]
[[package]]
name = "nix"
version = "0.31.2"
@@ -6301,9 +6313,9 @@ dependencies = [
[[package]]
name = "objc2-core-foundation"
version = "0.3.2"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a180dd8642fa45cdb7dd721cd4c11b1cadd4929ce112ebd8b9f5803cc79d536"
checksum = "1c10c2894a6fed806ade6027bcd50662746363a9589d3ec9d9bef30a4e4bc166"
dependencies = [
"bitflags 2.11.0",
]
@@ -6316,9 +6328,9 @@ checksum = "ef25abbcd74fb2609453eb695bd2f860d389e457f67dc17cafc8b8cbc89d0c33"
[[package]]
name = "objc2-io-kit"
version = "0.3.2"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33fafba39597d6dc1fb709123dfa8289d39406734be322956a69f0931c73bb15"
checksum = "71c1c64d6120e51cd86033f67176b1cb66780c2efe34dec55176f77befd93c0a"
dependencies = [
"libc",
"objc2-core-foundation",
@@ -6335,9 +6347,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.21.4"
version = "1.21.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50"
checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
dependencies = [
"critical-section",
"portable-atomic",
@@ -6506,7 +6518,7 @@ dependencies = [
"opentelemetry",
"tracing",
"tracing-core",
"tracing-subscriber 0.3.23",
"tracing-subscriber 0.3.22",
]
[[package]]
@@ -7709,7 +7721,7 @@ dependencies = [
"csv",
"ctrlc",
"eyre",
"nix",
"nix 0.31.2",
"reth-chainspec",
"reth-cli-runner",
"reth-cli-util",
@@ -10422,7 +10434,7 @@ dependencies = [
"tracing-journald",
"tracing-logfmt",
"tracing-samply",
"tracing-subscriber 0.3.23",
"tracing-subscriber 0.3.22",
"tracing-tracy",
"tracy-client",
]
@@ -10440,7 +10452,7 @@ dependencies = [
"opentelemetry_sdk",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber 0.3.23",
"tracing-subscriber 0.3.22",
"url",
]
@@ -11223,9 +11235,9 @@ dependencies = [
[[package]]
name = "schannel"
version = "0.1.29"
version = "0.1.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91c1b7e4904c873ef0710c1f407dde2e6287de2bebc1bbbf7d430bb7cbffd939"
checksum = "891d81b926048e76efe18581bf793546b4c0eaf8448d72be8de2bbee5fd166e1"
dependencies = [
"windows-sys 0.61.2",
]
@@ -11498,9 +11510,9 @@ dependencies = [
[[package]]
name = "serde_with"
version = "3.18.0"
version = "3.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd5414fad8e6907dbdd5bc441a50ae8d6e26151a03b1de04d89a5576de61d01f"
checksum = "381b283ce7bc6b476d903296fb59d0d36633652b633b27f64db4fb46dcbfc3b9"
dependencies = [
"base64 0.22.1",
"chrono",
@@ -11517,11 +11529,11 @@ dependencies = [
[[package]]
name = "serde_with_macros"
version = "3.18.0"
version = "3.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3db8978e608f1fe7357e211969fd9abdcae80bac1ba7a3369bb7eb6b404eb65"
checksum = "a6d4e30573c8cb306ed6ab1dca8423eec9a463ea0e155f45399455e0368b27e0"
dependencies = [
"darling 0.23.0",
"darling 0.21.3",
"proc-macro2",
"quote",
"syn 2.0.117",
@@ -11910,9 +11922,9 @@ dependencies = [
[[package]]
name = "sysinfo"
version = "0.38.4"
version = "0.38.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92ab6a2f8bfe508deb3c6406578252e491d299cbbf3bc0529ecc3313aee4a52f"
checksum = "d03c61d2a49c649a15c407338afe7accafde9dac869995dccb73e5f7ef7d9034"
dependencies = [
"libc",
"memchr",
@@ -11953,9 +11965,9 @@ dependencies = [
[[package]]
name = "tempfile"
version = "3.27.0"
version = "3.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd"
checksum = "82a72c767771b47409d2345987fda8628641887d5466101319899796367354a0"
dependencies = [
"fastrand 2.3.0",
"getrandom 0.4.2",
@@ -12215,9 +12227,9 @@ dependencies = [
[[package]]
name = "tinyvec"
version = "1.11.0"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e61e67053d25a4e82c844e8424039d9745781b3fc4f32b8d55ed50f5f667ef3"
checksum = "bfa5fdc3bce6191a1dbc8c02d5c8bffcf557bafa17c124c5264a458f1b0613fa"
dependencies = [
"tinyvec_macros",
]
@@ -12502,7 +12514,7 @@ dependencies = [
"crossbeam-channel",
"thiserror 2.0.18",
"time",
"tracing-subscriber 0.3.23",
"tracing-subscriber 0.3.22",
]
[[package]]
@@ -12533,7 +12545,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b1581020d7a273442f5b45074a6a57d5757ad0a47dac0e9f0bd57b81936f3db"
dependencies = [
"tracing",
"tracing-subscriber 0.3.23",
"tracing-subscriber 0.3.22",
]
[[package]]
@@ -12554,7 +12566,7 @@ checksum = "2d3a81ed245bfb62592b1e2bc153e77656d94ee6a0497683a65a12ccaf2438d0"
dependencies = [
"libc",
"tracing-core",
"tracing-subscriber 0.3.23",
"tracing-subscriber 0.3.22",
]
[[package]]
@@ -12577,7 +12589,7 @@ dependencies = [
"time",
"tracing",
"tracing-core",
"tracing-subscriber 0.3.23",
"tracing-subscriber 0.3.22",
]
[[package]]
@@ -12592,7 +12604,7 @@ dependencies = [
"tracing",
"tracing-core",
"tracing-log",
"tracing-subscriber 0.3.23",
"tracing-subscriber 0.3.22",
"web-time",
]
@@ -12609,7 +12621,7 @@ dependencies = [
"memmap2",
"smallvec",
"tracing-core",
"tracing-subscriber 0.3.23",
"tracing-subscriber 0.3.22",
]
[[package]]
@@ -12633,9 +12645,9 @@ dependencies = [
[[package]]
name = "tracing-subscriber"
version = "0.3.23"
version = "0.3.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319"
checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e"
dependencies = [
"matchers",
"nu-ansi-term",
@@ -12659,7 +12671,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0eaa1852afa96e0fe9e44caa53dc0bd2d9d05e0f2611ce09f97f8677af56e4ba"
dependencies = [
"tracing-core",
"tracing-subscriber 0.3.23",
"tracing-subscriber 0.3.22",
"tracy-client",
]
@@ -13958,18 +13970,18 @@ dependencies = [
[[package]]
name = "zerocopy"
version = "0.8.42"
version = "0.8.41"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2578b716f8a7a858b7f02d5bd870c14bf4ddbbcf3a4c05414ba6503640505e3"
checksum = "96e13bc581734df6250836c59a5f44f3c57db9f9acb9dc8e3eaabdaf6170254d"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.8.42"
version = "0.8.41"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e6cc098ea4d3bd6246687de65af3f920c430e236bee1e3bf2e441463f08a02f"
checksum = "3545ea9e86d12ab9bba9fcd99b54c1556fd3199007def5a03c375623d05fac1c"
dependencies = [
"proc-macro2",
"quote",

View File

@@ -89,6 +89,7 @@ default = [
"keccak-cache-global",
"asm-keccak",
"min-debug-logs",
"rocksdb",
]
otlp = [
@@ -190,6 +191,8 @@ min-trace-logs = [
]
trie-debug = ["reth-node-builder/trie-debug", "reth-node-core/trie-debug"]
rocksdb = ["reth-ethereum-cli/rocksdb", "reth-node-core/rocksdb"]
edge = ["rocksdb"]
[[bin]]
name = "reth"

View File

@@ -992,7 +992,7 @@ impl<N: NodePrimitives<SignedTx: SignedTransaction>> NewCanonicalChain<N> {
///
/// Returns the new tip for [`Self::Reorg`] and [`Self::Commit`] variants which commit at least
/// 1 new block.
pub fn tip(&self) -> &RecoveredBlock<N::Block> {
pub fn tip(&self) -> &SealedBlock<N::Block> {
match self {
Self::Commit { new } | Self::Reorg { new, .. } => {
new.last().expect("non empty blocks").recovered_block()

View File

@@ -110,6 +110,7 @@ reth-provider = { workspace = true, features = ["test-utils"] }
tempfile.workspace = true
[features]
default = []
arbitrary = [
"dep:proptest",
"dep:arbitrary",
@@ -134,3 +135,6 @@ arbitrary = [
"reth-primitives-traits/arbitrary",
"reth-ethereum-primitives/arbitrary",
]
rocksdb = ["reth-db-common/rocksdb", "reth-stages/rocksdb", "reth-provider/rocksdb", "reth-prune/rocksdb"]
edge = ["rocksdb"]

View File

@@ -21,6 +21,7 @@ use std::{
};
use tracing::{info, warn};
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb;
/// Interval for logging progress during checksum computation.
@@ -72,6 +73,7 @@ enum Subcommand {
limit: Option<usize>,
},
/// Calculates the checksum of a RocksDB table
#[cfg(all(unix, feature = "rocksdb"))]
Rocksdb {
/// The RocksDB table
#[arg(value_enum)]
@@ -98,6 +100,7 @@ impl Command {
Subcommand::StaticFile { segment, start_block, end_block, limit } => {
checksum_static_file(tool, segment, start_block, end_block, limit)?;
}
#[cfg(all(unix, feature = "rocksdb"))]
Subcommand::Rocksdb { table, limit } => {
rocksdb::checksum_rocksdb(tool, table, limit)?;
}

View File

@@ -19,12 +19,11 @@ use reth_node_api::BlockTy;
use reth_node_events::node::NodeEvent;
use reth_provider::{
providers::ProviderNodeTypes, BlockNumReader, HeaderProvider, ProviderError, ProviderFactory,
RocksDBProviderFactory, StageCheckpointReader,
StageCheckpointReader,
};
use reth_prune::PruneModes;
use reth_stages::{prelude::*, ControlFlow, Pipeline, StageId, StageSet};
use reth_static_file::StaticFileProducer;
use reth_storage_api::StorageSettingsCache;
use std::{path::Path, sync::Arc};
use tokio::sync::watch;
use tracing::{debug, error, info, warn};
@@ -109,11 +108,7 @@ where
let provider = provider_factory.provider()?;
let init_blocks = provider.tx_ref().entries::<tables::HeaderNumbers>()?;
let init_txns = if provider_factory.cached_storage_settings().storage_v2 {
provider_factory.rocksdb_provider().iter::<tables::TransactionHashNumbers>()?.count()
} else {
provider.tx_ref().entries::<tables::TransactionHashNumbers>()?
};
let init_txns = provider.tx_ref().entries::<tables::TransactionHashNumbers>()?;
drop(provider);
let mut total_decoded_blocks = 0;
@@ -220,12 +215,8 @@ where
let provider = provider_factory.provider()?;
let total_imported_blocks = provider.tx_ref().entries::<tables::HeaderNumbers>()? - init_blocks;
let current_txns = if provider_factory.cached_storage_settings().storage_v2 {
provider_factory.rocksdb_provider().iter::<tables::TransactionHashNumbers>()?.count()
} else {
provider.tx_ref().entries::<tables::TransactionHashNumbers>()?
};
let total_imported_txns = current_txns - init_txns;
let total_imported_txns =
provider.tx_ref().entries::<tables::TransactionHashNumbers>()? - init_txns;
let result = ImportResult {
total_decoded_blocks,

View File

@@ -193,10 +193,7 @@ impl<C: ChainSpecParser> DownloadArgs<C> {
let default_secret_key_path = data_dir.p2p_secret();
let p2p_secret_key = self.network.secret_key(default_secret_key_path)?;
let rlpx_socket = (self.network.addr, self.network.port).into();
let boot_nodes = self
.network
.resolved_bootnodes()
.unwrap_or_else(|| self.chain.bootnodes().unwrap_or_default());
let boot_nodes = self.chain.bootnodes().unwrap_or_default();
let net =
NetworkConfigBuilder::<N::NetworkPrimitives>::new(p2p_secret_key, Runtime::test())

View File

@@ -12,6 +12,7 @@ use reth_node_metrics::{
server::{MetricServer, MetricServerConfig},
version::VersionInfo,
};
#[cfg(all(unix, feature = "rocksdb"))]
use reth_provider::RocksDBProviderFactory;
use reth_prune::PrunerBuilder;
use reth_static_file::StaticFileProducer;
@@ -121,6 +122,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> PruneComma
}
// Flush and compact RocksDB to reclaim disk space after pruning
#[cfg(all(unix, feature = "rocksdb"))]
{
info!(target: "reth::cli", "Flushing and compacting RocksDB...");
provider_factory.rocksdb_provider().flush_and_compact()?;

View File

@@ -75,3 +75,8 @@ path = "tests/e2e-testsuite/main.rs"
[[test]]
name = "rocksdb"
path = "tests/rocksdb/main.rs"
required-features = ["rocksdb"]
[features]
rocksdb = ["reth-node-core/rocksdb", "reth-provider/rocksdb", "reth-cli-commands/rocksdb"]
edge = ["rocksdb"]

View File

@@ -1,5 +1,7 @@
//! E2E tests for `RocksDB` provider functionality.
#![cfg(all(feature = "rocksdb", unix))]
use alloy_consensus::BlockHeader;
use alloy_primitives::B256;
use alloy_rpc_types_eth::{Transaction, TransactionReceipt};

View File

@@ -139,6 +139,13 @@ trie-debug = [
"reth-engine-primitives/trie-debug",
"dep:serde_json",
]
rocksdb = [
"reth-provider/rocksdb",
"reth-prune/rocksdb",
"reth-stages?/rocksdb",
"reth-e2e-test-utils/rocksdb",
]
edge = ["rocksdb"]
[[test]]
name = "e2e_testsuite"

View File

@@ -38,7 +38,10 @@ use reth_trie_parallel::{
proof_task::{ProofTaskCtx, ProofWorkerHandle},
root::ParallelStateRootError,
};
use reth_trie_sparse::ParallelismThresholds;
use reth_trie_sparse::{
ArenaParallelSparseTrie, ConfigurableSparseTrie, ParallelSparseTrie, ParallelismThresholds,
RevealableSparseTrie, SparseStateTrie,
};
use std::{
ops::Not,
sync::{
@@ -57,10 +60,7 @@ pub mod prewarm;
pub mod receipt_root_task;
pub mod sparse_trie;
pub use preserved_sparse_trie::{
PayloadSparseTrieCache, PayloadSparseTrieKind, PayloadSparseTrieStoreOutcome,
SparseTrieCheckout,
};
use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
/// Default parallelism thresholds to use with the [`ParallelSparseTrie`].
///
@@ -104,52 +104,6 @@ type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
<N as NodePrimitives>::Receipt,
>;
/// Shared cache handles that can be exported to engine consumers and downstream payload builders.
#[derive(Debug, Clone)]
pub struct EngineSharedCaches<Evm: ConfigureEvm> {
execution_cache: PayloadExecutionCache,
sparse_trie_cache: PayloadSparseTrieCache,
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
}
impl<Evm> Default for EngineSharedCaches<Evm>
where
Evm: ConfigureEvm,
{
fn default() -> Self {
Self::with_sparse_trie_kind(PayloadSparseTrieKind::default())
}
}
impl<Evm> EngineSharedCaches<Evm>
where
Evm: ConfigureEvm,
{
/// Creates shared caches backed by the requested sparse trie implementation.
pub fn with_sparse_trie_kind(sparse_trie_kind: PayloadSparseTrieKind) -> Self {
Self {
execution_cache: Default::default(),
sparse_trie_cache: PayloadSparseTrieCache::new(sparse_trie_kind),
precompile_cache_map: Default::default(),
}
}
/// Returns the shared execution cache handle for engine-internal use.
pub(crate) fn execution_cache(&self) -> PayloadExecutionCache {
self.execution_cache.clone()
}
/// Returns the shared sparse trie cache handle.
pub fn sparse_trie_cache(&self) -> PayloadSparseTrieCache {
self.sparse_trie_cache.clone()
}
/// Returns the shared precompile cache map.
pub fn precompile_cache_map(&self) -> PrecompileCacheMap<SpecFor<Evm>> {
self.precompile_cache_map.clone()
}
}
/// Entrypoint for executing the payload.
#[derive(Debug)]
pub struct PayloadProcessor<Evm>
@@ -158,8 +112,8 @@ where
{
/// The executor used by to spawn tasks.
executor: Runtime,
/// Shared caches reused across payload processing.
shared_caches: EngineSharedCaches<Evm>,
/// The most recent cache used for execution.
execution_cache: PayloadExecutionCache,
/// Metrics for trie operations
trie_metrics: MultiProofTaskMetrics,
/// Cross-block cache size in bytes.
@@ -172,12 +126,20 @@ where
evm_config: Evm,
/// Whether precompile cache should be disabled.
precompile_cache_disabled: bool,
/// Precompile cache map.
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
/// A pruned `SparseStateTrie`, kept around as a cache of already revealed trie nodes and to
/// re-use allocated memory. Stored with the block hash it was computed for to enable trie
/// preservation across sequential payload validations.
sparse_state_trie: SharedPreservedSparseTrie,
/// LFU hot-slot capacity: max storage slots retained across prune cycles.
sparse_trie_max_hot_slots: usize,
/// LFU hot-account capacity: max account addresses retained across prune cycles.
sparse_trie_max_hot_accounts: usize,
/// Whether sparse trie cache pruning is fully disabled.
disable_sparse_trie_cache_pruning: bool,
/// Whether to use the arena-based sparse trie implementation.
enable_arena_sparse_trie: bool,
/// Whether to disable cache metrics recording.
disable_cache_metrics: bool,
}
@@ -197,20 +159,23 @@ where
executor: Runtime,
evm_config: Evm,
config: &TreeConfig,
shared_caches: EngineSharedCaches<Evm>,
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
) -> Self {
Self {
executor,
shared_caches,
execution_cache: Default::default(),
trie_metrics: Default::default(),
cross_block_cache_size: config.cross_block_cache_size(),
disable_transaction_prewarming: config.disable_prewarming(),
evm_config,
disable_state_cache: config.disable_state_cache(),
precompile_cache_disabled: config.precompile_cache_disabled(),
precompile_cache_map,
sparse_state_trie: SharedPreservedSparseTrie::default(),
sparse_trie_max_hot_slots: config.sparse_trie_max_hot_slots(),
sparse_trie_max_hot_accounts: config.sparse_trie_max_hot_accounts(),
disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
enable_arena_sparse_trie: config.enable_arena_sparse_trie(),
disable_cache_metrics: config.disable_cache_metrics(),
}
}
@@ -224,8 +189,8 @@ where
debug!(target: "engine::tree::payload_processor", "Waiting for execution cache and sparse trie locks");
// Wait for both caches in parallel using std threads
let execution_cache = self.shared_caches.execution_cache();
let sparse_trie = self.shared_caches.sparse_trie_cache();
let execution_cache = self.execution_cache.clone();
let sparse_trie = self.sparse_state_trie.clone();
// Use channels and spawn_blocking instead of std::thread::spawn
let (execution_tx, execution_rx) = std::sync::mpsc::channel();
@@ -539,12 +504,12 @@ where
terminate_execution: Arc::new(AtomicBool::new(false)),
executed_tx_index: Arc::clone(&executed_tx_index),
precompile_cache_disabled: self.precompile_cache_disabled,
precompile_cache_map: self.shared_caches.precompile_cache_map(),
precompile_cache_map: self.precompile_cache_map.clone(),
};
let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
self.executor.clone(),
self.shared_caches.execution_cache(),
self.execution_cache.clone(),
prewarm_ctx,
to_multi_proof,
);
@@ -572,7 +537,7 @@ where
/// instance.
#[instrument(level = "debug", target = "engine::caching", skip(self))]
fn cache_for(&self, parent_hash: B256) -> SavedCache {
if let Some(cache) = self.shared_caches.execution_cache().get_cache_for(parent_hash) {
if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
debug!("reusing execution cache");
cache
} else {
@@ -597,11 +562,12 @@ where
parent_state_root: B256,
chunk_size: usize,
) {
let sparse_trie_cache = self.shared_caches.sparse_trie_cache();
let preserved_sparse_trie = self.sparse_state_trie.clone();
let trie_metrics = self.trie_metrics.clone();
let max_hot_slots = self.sparse_trie_max_hot_slots;
let max_hot_accounts = self.sparse_trie_max_hot_accounts;
let disable_cache_pruning = self.disable_sparse_trie_cache_pruning;
let enable_arena_sparse_trie = self.enable_arena_sparse_trie;
let executor = self.executor.clone();
let parent_span = Span::current();
@@ -611,19 +577,49 @@ where
let _enter = debug_span!(target: "engine::tree::payload_processor", parent: parent_span, "sparse_trie_task")
.entered();
// Reuse a stored SparseStateTrie if available, applying continuation logic.
// If this payload's parent state root matches the preserved trie's anchor,
// we can reuse the pruned trie structure. Otherwise, we clear the trie but
// keep allocations.
let start = Instant::now();
let mut checkout = sparse_trie_cache.take_or_create_for(parent_state_root);
let preserved = preserved_sparse_trie.take();
trie_metrics
.sparse_trie_cache_wait_duration_histogram
.record(start.elapsed().as_secs_f64());
checkout.set_hot_cache_capacities(max_hot_slots, max_hot_accounts);
let mut task = SparseTrieCacheTask::new_with_checkout(
let mut sparse_state_trie = preserved
.map(|preserved| preserved.into_trie_for(parent_state_root))
.unwrap_or_else(|| {
debug!(
target: "engine::tree::payload_processor",
"Creating new sparse trie - no preserved trie available"
);
let default_trie = if enable_arena_sparse_trie {
RevealableSparseTrie::blind_from(
ConfigurableSparseTrie::Arena(ArenaParallelSparseTrie::default()),
)
} else {
RevealableSparseTrie::blind_from(
ConfigurableSparseTrie::HashMap(
ParallelSparseTrie::default().with_parallelism_thresholds(
PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS,
),
),
)
};
SparseStateTrie::default()
.with_accounts_trie(default_trie.clone())
.with_default_storage_trie(default_trie)
.with_updates(true)
});
sparse_state_trie.set_hot_cache_capacities(max_hot_slots, max_hot_accounts);
let mut task = SparseTrieCacheTask::new_with_trie(
&executor,
from_multi_proof,
proof_worker_handle,
trie_metrics.clone(),
checkout,
sparse_state_trie,
chunk_size,
);
@@ -634,7 +630,7 @@ where
// causing take() to return None and forcing it to create a new empty trie
// instead of reusing the preserved one. Holding the guard ensures the next
// block's take() blocks until we've stored the trie for reuse.
let mut guard = sparse_trie_cache.lock();
let mut guard = preserved_sparse_trie.lock();
let task_result = result.as_ref().ok().cloned();
// Send state root computation result - next block may start but will block on take()
@@ -649,7 +645,7 @@ where
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
);
trie.store_prepared_cleared_with_guard(&mut guard);
guard.store(PreservedSparseTrie::cleared(trie));
drop(guard);
executor.spawn_drop(deferred);
return;
@@ -678,7 +674,7 @@ where
trie_metrics
.sparse_trie_retained_storage_tries
.set(trie.retained_storage_tries_count() as f64);
trie.store_anchored_with_guard(&mut guard, result.state_root);
guard.store(PreservedSparseTrie::anchored(trie, result.state_root));
deferred
} else {
debug!(
@@ -689,7 +685,7 @@ where
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
);
trie.store_prepared_cleared_with_guard(&mut guard);
guard.store(PreservedSparseTrie::cleared(trie));
deferred
};
drop(guard);
@@ -710,7 +706,7 @@ where
bundle_state: &BundleState,
) {
let disable_cache_metrics = self.disable_cache_metrics;
self.shared_caches.execution_cache().update_with_guard(|cached| {
self.execution_cache.update_with_guard(|cached| {
if cached.as_ref().is_some_and(|c| c.executed_block_hash() != block_with_parent.parent) {
debug!(
target: "engine::caching",
@@ -1014,7 +1010,7 @@ impl<R> Drop for CacheTaskHandle<R> {
/// - Prepares data for state root proof computation
/// - Runs concurrently but must not interfere with cache saves
#[derive(Clone, Debug, Default)]
pub(crate) struct PayloadExecutionCache {
pub struct PayloadExecutionCache {
/// Guarded cloneable cache identified by a block hash.
inner: Arc<RwLock<Option<SavedCache>>>,
/// Metrics for cache operations.
@@ -1022,11 +1018,11 @@ pub(crate) struct PayloadExecutionCache {
}
impl PayloadExecutionCache {
/// Returns the cache backing store for `parent_hash` if it's available for reuse.
/// Returns the cache for `parent_hash` if it's available for use.
///
/// If the tracked cache is available but keyed to a different parent hash, the cache is
/// cleared and returned so callers can reuse the underlying allocations without carrying over
/// stale state.
/// A cache is considered available when:
/// - It exists and matches the requested parent hash
/// - No other tasks are currently using it (checked via Arc reference count)
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip(self))]
pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
let start = Instant::now();
@@ -1065,7 +1061,7 @@ impl PayloadExecutionCache {
// and picking up polluted data if the fork block fails.
c.clear_with_hash(parent_hash);
}
return Some(c.clone());
return Some(c.clone())
} else if hash_matches {
self.metrics.execution_cache_in_use.increment(1);
}
@@ -1082,7 +1078,7 @@ impl PayloadExecutionCache {
/// This is useful for synchronization before starting payload processing.
///
/// Returns the time spent waiting for the lock.
pub(crate) fn wait_for_availability(&self) -> Duration {
pub fn wait_for_availability(&self) -> Duration {
let start = Instant::now();
// Acquire write lock to wait for any current holders to finish
let _guard = self.inner.write();
@@ -1110,7 +1106,7 @@ impl PayloadExecutionCache {
///
/// Violating this requirement can result in cache corruption, incorrect state data,
/// and potential consensus failures.
pub(crate) fn update_with_guard<F>(&self, update_fn: F)
pub fn update_with_guard<F>(&self, update_fn: F)
where
F: FnOnce(&mut Option<SavedCache>),
{
@@ -1179,9 +1175,8 @@ mod tests {
use super::PayloadExecutionCache;
use crate::tree::{
cached_state::{CachedStateMetrics, ExecutionCache, SavedCache},
payload_processor::{
evm_state_to_hashed_post_state, EngineSharedCaches, ExecutionEnv, PayloadProcessor,
},
payload_processor::{evm_state_to_hashed_post_state, ExecutionEnv, PayloadProcessor},
precompile_cache::PrecompileCacheMap,
StateProviderBuilder, TreeConfig,
};
use alloy_eips::eip1898::{BlockNumHash, BlockWithParent};
@@ -1293,7 +1288,7 @@ mod tests {
reth_tasks::Runtime::test(),
EthEvmConfig::new(Arc::new(ChainSpec::default())),
&TreeConfig::default(),
EngineSharedCaches::default(),
PrecompileCacheMap::default(),
);
let parent_hash = B256::from([1u8; 32]);
@@ -1305,17 +1300,13 @@ mod tests {
let bundle_state = BundleState::default();
// Cache should be empty initially
assert!(payload_processor
.shared_caches
.execution_cache()
.get_cache_for(block_hash)
.is_none());
assert!(payload_processor.execution_cache.get_cache_for(block_hash).is_none());
// Update cache with inserted block
payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
// Cache should now exist for the block hash
let cached = payload_processor.shared_caches.execution_cache().get_cache_for(block_hash);
let cached = payload_processor.execution_cache.get_cache_for(block_hash);
assert!(cached.is_some());
assert_eq!(cached.unwrap().executed_block_hash(), block_hash);
}
@@ -1326,14 +1317,13 @@ mod tests {
reth_tasks::Runtime::test(),
EthEvmConfig::new(Arc::new(ChainSpec::default())),
&TreeConfig::default(),
EngineSharedCaches::default(),
PrecompileCacheMap::default(),
);
// Setup: populate cache with block 1
let block1_hash = B256::from([1u8; 32]);
payload_processor
.shared_caches
.execution_cache()
.execution_cache
.update_with_guard(|slot| *slot = Some(make_saved_cache(block1_hash)));
// Try to insert block 3 with wrong parent (should skip and keep block 1's cache)
@@ -1348,11 +1338,11 @@ mod tests {
payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
// Cache should still be for block 1 (unchanged)
let cached = payload_processor.shared_caches.execution_cache().get_cache_for(block1_hash);
let cached = payload_processor.execution_cache.get_cache_for(block1_hash);
assert!(cached.is_some(), "Original cache should be preserved");
// Cache for block 3 should not exist
let cached3 = payload_processor.shared_caches.execution_cache().get_cache_for(block3_hash);
let cached3 = payload_processor.execution_cache.get_cache_for(block3_hash);
assert!(cached3.is_none(), "New block cache should not be created on mismatch");
}
@@ -1462,7 +1452,7 @@ mod tests {
reth_tasks::Runtime::test(),
EthEvmConfig::new(factory.chain_spec()),
&TreeConfig::default(),
EngineSharedCaches::default(),
PrecompileCacheMap::default(),
);
let provider_factory = BlockchainProvider::new(factory).unwrap();

View File

@@ -1,128 +1,44 @@
//! Preserved sparse trie for reuse across payload validations.
use super::{
PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS, SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
};
use alloy_primitives::B256;
use parking_lot::Mutex;
use reth_trie_sparse::{
ArenaParallelSparseTrie, ConfigurableSparseTrie, ParallelSparseTrie, RevealableSparseTrie,
SparseStateTrie,
};
use std::{
ops::{Deref, DerefMut},
sync::Arc,
time::{Duration, Instant},
};
use reth_trie_sparse::{ConfigurableSparseTrie, SparseStateTrie};
use std::{sync::Arc, time::Instant};
use tracing::debug;
/// Type alias for the sparse trie type used in preservation.
type SparseTrie = SparseStateTrie<ConfigurableSparseTrie, ConfigurableSparseTrie>;
pub(super) type SparseTrie = SparseStateTrie<ConfigurableSparseTrie, ConfigurableSparseTrie>;
/// Sparse trie implementation used by [`PayloadSparseTrieCache`].
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum PayloadSparseTrieKind {
/// Back sparse trie storage with hash maps.
#[default]
HashMap,
/// Back sparse trie storage with arena allocations.
Arena,
}
impl From<bool> for PayloadSparseTrieKind {
fn from(enable_arena_sparse_trie: bool) -> Self {
if enable_arena_sparse_trie {
Self::Arena
} else {
Self::HashMap
}
}
}
#[derive(Debug, Default)]
struct PayloadSparseTrieState {
latest_checkout_id: u64,
preserved: Option<PreservedSparseTrie>,
}
/// Outcome of storing a checked-out sparse trie back into the shared cache.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PayloadSparseTrieStoreOutcome {
/// The checkout was the most recent lease and the trie was stored.
Stored,
/// A newer checkout had already been issued, so this stale lease was ignored.
IgnoredStaleCheckout,
}
/// Shared sparse trie cache that can be reused across payload validations.
/// Shared handle to a preserved sparse trie that can be reused across payload validations.
///
/// This is the public sparse-trie SDK surface exposed through
/// [`EngineSharedCaches`](super::EngineSharedCaches). Callers take or create a trie, use it for
/// payload work, then store it back either anchored to the resulting state root or cleared for
/// allocation reuse.
#[derive(Debug, Clone)]
pub struct PayloadSparseTrieCache {
kind: PayloadSparseTrieKind,
state: Arc<Mutex<PayloadSparseTrieState>>,
}
/// This is stored in [`PayloadProcessor`](super::PayloadProcessor) and cloned to pass to
/// [`SparseTrieCacheTask`](super::sparse_trie::SparseTrieCacheTask) for trie reuse.
#[derive(Debug, Default, Clone)]
pub(super) struct SharedPreservedSparseTrie(Arc<Mutex<Option<PreservedSparseTrie>>>);
impl Default for PayloadSparseTrieCache {
fn default() -> Self {
Self::new(PayloadSparseTrieKind::default())
}
}
impl PayloadSparseTrieCache {
/// Creates a sparse trie cache backed by the requested trie implementation.
pub fn new(kind: PayloadSparseTrieKind) -> Self {
Self { kind, state: Arc::new(Mutex::new(PayloadSparseTrieState::default())) }
impl SharedPreservedSparseTrie {
/// Takes the preserved trie if present, leaving `None` in its place.
pub(super) fn take(&self) -> Option<PreservedSparseTrie> {
self.0.lock().take()
}
/// Returns the sparse trie implementation used when the cache needs to create a new trie.
pub const fn kind(&self) -> PayloadSparseTrieKind {
self.kind
}
/// Takes a preserved trie for `parent_state_root` or creates a new trie if the cache is empty.
pub fn take_or_create_for(&self, parent_state_root: B256) -> SparseTrieCheckout {
let start = Instant::now();
let mut state = self.state.lock();
state.latest_checkout_id += 1;
let checkout_id = state.latest_checkout_id;
let trie = state
.preserved
.take()
.map(|preserved| preserved.into_trie_for(parent_state_root))
.unwrap_or_else(|| {
debug!(
target: "engine::tree::payload_processor",
%parent_state_root,
kind = ?self.kind,
"Creating new sparse trie - no preserved trie available"
);
new_sparse_trie(self.kind)
});
drop(state);
let elapsed = start.elapsed();
if elapsed.as_millis() > 5 {
debug!(
target: "engine::tree::payload_processor",
blocked_for=?elapsed,
"Waited for preserved sparse trie checkout"
);
}
SparseTrieCheckout { trie: Some(trie), cache: self.clone(), checkout_id }
/// Acquires a guard that blocks `take()` until dropped.
/// Use this before sending the state root result to ensure the next block
/// waits for the trie to be stored.
pub(super) fn lock(&self) -> PreservedTrieGuard<'_> {
PreservedTrieGuard(self.0.lock())
}
/// Waits until the sparse trie lock becomes available.
///
/// This acquires and immediately releases the lock, ensuring that any
/// ongoing operations complete before returning. Useful for synchronization
/// before starting payload processing.
///
/// Returns the time spent waiting for the lock.
pub fn wait_for_availability(&self) -> Duration {
pub(super) fn wait_for_availability(&self) -> std::time::Duration {
let start = Instant::now();
let _guard = self.state.lock();
let _guard = self.0.lock();
let elapsed = start.elapsed();
if elapsed.as_millis() > 5 {
debug!(
@@ -133,142 +49,27 @@ impl PayloadSparseTrieCache {
}
elapsed
}
/// Acquires a guard that blocks cache mutation until dropped.
///
/// Engine-internal code uses this before making the state-root result visible so the next
/// payload cannot observe an empty cache between send and store.
pub(super) fn lock(&self) -> PreservedTrieGuard<'_> {
PreservedTrieGuard { state: self.state.lock() }
}
}
/// A checked-out sparse trie lease.
///
/// This dereferences to [`SparseStateTrie`] so callers can reuse the trie directly. If the lease is
/// dropped without being stored back, a cleared trie is returned to the shared cache unless a newer
/// checkout has already superseded it.
#[derive(Debug)]
pub struct SparseTrieCheckout {
trie: Option<SparseTrie>,
cache: PayloadSparseTrieCache,
checkout_id: u64,
}
impl SparseTrieCheckout {
/// Stores the trie back into the shared cache anchored to the given state root.
pub fn store_anchored(self, state_root: B256) -> PayloadSparseTrieStoreOutcome {
let cache = self.cache.clone();
let mut guard = cache.lock();
self.store_anchored_with_guard(&mut guard, state_root)
}
/// Stores the trie back into the shared cache in a cleared state.
pub fn store_cleared(mut self) -> PayloadSparseTrieStoreOutcome {
let cache = self.cache.clone();
let mut trie = self.take_trie();
prepare_cleared_trie(&mut trie);
let deferred = trie.take_deferred_drops();
let mut guard = cache.lock();
let outcome = guard.store(self.checkout_id, PreservedSparseTrie::cleared(trie));
drop(guard);
drop(deferred);
outcome
}
/// Stores the trie back into the shared cache anchored to the given state root while the
/// caller is already holding the preservation lock.
pub(super) fn store_anchored_with_guard(
mut self,
guard: &mut PreservedTrieGuard<'_>,
state_root: B256,
) -> PayloadSparseTrieStoreOutcome {
guard.store(self.checkout_id, PreservedSparseTrie::anchored(self.take_trie(), state_root))
}
/// Stores an already-cleared trie back into the shared cache while the caller is already
/// holding the preservation lock.
pub(super) fn store_prepared_cleared_with_guard(
mut self,
guard: &mut PreservedTrieGuard<'_>,
) -> PayloadSparseTrieStoreOutcome {
guard.store(self.checkout_id, PreservedSparseTrie::cleared(self.take_trie()))
}
fn take_trie(&mut self) -> SparseTrie {
self.trie.take().expect("sparse trie checkout must hold a trie until it is stored")
}
}
impl Deref for SparseTrieCheckout {
type Target = SparseTrie;
fn deref(&self) -> &Self::Target {
self.trie.as_ref().expect("sparse trie checkout must hold a trie until it is stored")
}
}
impl DerefMut for SparseTrieCheckout {
fn deref_mut(&mut self) -> &mut Self::Target {
self.trie.as_mut().expect("sparse trie checkout must hold a trie until it is stored")
}
}
impl Drop for SparseTrieCheckout {
fn drop(&mut self) {
let Some(mut trie) = self.trie.take() else { return };
debug!(
target: "engine::tree::payload_processor",
checkout_id = self.checkout_id,
"Sparse trie checkout dropped before store, returning cleared trie to cache"
);
prepare_cleared_trie(&mut trie);
let deferred = trie.take_deferred_drops();
let mut guard = self.cache.lock();
let _ = guard.store(self.checkout_id, PreservedSparseTrie::cleared(trie));
drop(guard);
drop(deferred);
}
}
/// Guard that holds the lock on the preserved trie.
/// While held, take-or-create calls will block. Call `store()` to save the trie before dropping.
pub(super) struct PreservedTrieGuard<'a> {
state: parking_lot::MutexGuard<'a, PayloadSparseTrieState>,
}
/// While held, `take()` will block. Call `store()` to save the trie before dropping.
pub(super) struct PreservedTrieGuard<'a>(parking_lot::MutexGuard<'a, Option<PreservedSparseTrie>>);
impl PreservedTrieGuard<'_> {
/// Stores a preserved trie for later reuse if the checkout is still current.
fn store(
&mut self,
checkout_id: u64,
trie: PreservedSparseTrie,
) -> PayloadSparseTrieStoreOutcome {
if checkout_id != self.state.latest_checkout_id {
debug!(
target: "engine::tree::payload_processor",
checkout_id,
latest_checkout_id = self.state.latest_checkout_id,
"Ignoring stale sparse trie checkout"
);
return PayloadSparseTrieStoreOutcome::IgnoredStaleCheckout;
}
self.state.preserved.replace(trie);
PayloadSparseTrieStoreOutcome::Stored
/// Stores a preserved trie for later reuse.
pub(super) fn store(&mut self, trie: PreservedSparseTrie) {
self.0.replace(trie);
}
}
/// A preserved sparse trie that can be reused across payload validations.
///
/// The trie exists in one of two states:
/// - **Anchored**: Has a computed state root and can be reused for payloads whose parent state
/// root matches the anchor.
/// - **Anchored**: Has a computed state root and can be reused for payloads whose parent state root
/// matches the anchor.
/// - **Cleared**: Trie data has been cleared but allocations are preserved for reuse.
#[derive(Debug)]
enum PreservedSparseTrie {
pub(super) enum PreservedSparseTrie {
/// Trie with a computed state root that can be reused for continuation payloads.
Anchored {
/// The sparse state trie (pruned after root computation).
@@ -286,17 +87,24 @@ enum PreservedSparseTrie {
impl PreservedSparseTrie {
/// Creates a new anchored preserved trie.
const fn anchored(trie: SparseTrie, state_root: B256) -> Self {
///
/// The `state_root` is the computed state root from the trie, which becomes the
/// anchor for determining if subsequent payloads can reuse this trie.
pub(super) const fn anchored(trie: SparseTrie, state_root: B256) -> Self {
Self::Anchored { trie, state_root }
}
/// Creates a cleared preserved trie (allocations preserved, data cleared).
const fn cleared(trie: SparseTrie) -> Self {
pub(super) const fn cleared(trie: SparseTrie) -> Self {
Self::Cleared { trie }
}
/// Consumes self and returns the trie for reuse.
fn into_trie_for(self, parent_state_root: B256) -> SparseTrie {
///
/// If the preserved trie is anchored and the parent state root matches, the pruned
/// trie structure is reused directly. Otherwise, the trie is cleared but allocations
/// are preserved to reduce memory overhead.
pub(super) fn into_trie_for(self, parent_state_root: B256) -> SparseTrie {
match self {
Self::Anchored { trie, state_root } if state_root == parent_state_root => {
debug!(
@@ -327,111 +135,3 @@ impl PreservedSparseTrie {
}
}
}
fn new_sparse_trie(kind: PayloadSparseTrieKind) -> SparseTrie {
let default_trie = match kind {
PayloadSparseTrieKind::HashMap => {
RevealableSparseTrie::blind_from(ConfigurableSparseTrie::HashMap(
ParallelSparseTrie::default()
.with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
))
}
PayloadSparseTrieKind::Arena => RevealableSparseTrie::blind_from(
ConfigurableSparseTrie::Arena(ArenaParallelSparseTrie::default()),
),
};
SparseStateTrie::default()
.with_accounts_trie(default_trie.clone())
.with_default_storage_trie(default_trie)
.with_updates(true)
}
fn prepare_cleared_trie(trie: &mut SparseTrie) {
trie.clear();
trie.shrink_to(SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY, SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn take_or_create_reuses_matching_anchor() {
let cache = PayloadSparseTrieCache::default();
let state_root = B256::with_last_byte(1);
assert_eq!(
cache.take_or_create_for(state_root).store_anchored(state_root),
PayloadSparseTrieStoreOutcome::Stored
);
match cache.state.lock().preserved.as_ref() {
Some(PreservedSparseTrie::Anchored { state_root: anchored, .. }) => {
assert_eq!(*anchored, state_root);
}
other => panic!("expected anchored trie, got {other:?}"),
}
}
#[test]
fn drop_restores_cleared_trie() {
let cache = PayloadSparseTrieCache::default();
let state_root = B256::with_last_byte(2);
let mut checkout = cache.take_or_create_for(state_root);
checkout.set_updates(true);
drop(checkout);
match cache.state.lock().preserved.as_ref() {
Some(PreservedSparseTrie::Cleared { .. }) => {}
other => panic!("expected cleared trie, got {other:?}"),
}
}
#[test]
fn stale_checkout_does_not_overwrite_newer_store() {
let cache = PayloadSparseTrieCache::default();
let parent_state_root = B256::with_last_byte(3);
let anchored_state_root = B256::with_last_byte(4);
let stale = cache.take_or_create_for(parent_state_root);
let fresh = cache.take_or_create_for(parent_state_root);
assert_eq!(
fresh.store_anchored(anchored_state_root),
PayloadSparseTrieStoreOutcome::Stored
);
assert_eq!(stale.store_cleared(), PayloadSparseTrieStoreOutcome::IgnoredStaleCheckout);
match cache.state.lock().preserved.as_ref() {
Some(PreservedSparseTrie::Anchored { state_root, .. }) => {
assert_eq!(*state_root, anchored_state_root);
}
other => panic!("expected anchored trie to survive stale checkout, got {other:?}"),
}
}
#[test]
fn stale_checkout_drop_does_not_overwrite_newer_store() {
let cache = PayloadSparseTrieCache::default();
let parent_state_root = B256::with_last_byte(5);
let anchored_state_root = B256::with_last_byte(6);
let stale = cache.take_or_create_for(parent_state_root);
let fresh = cache.take_or_create_for(parent_state_root);
assert_eq!(
fresh.store_anchored(anchored_state_root),
PayloadSparseTrieStoreOutcome::Stored
);
drop(stale);
match cache.state.lock().preserved.as_ref() {
Some(PreservedSparseTrie::Anchored { state_root, .. }) => {
assert_eq!(*state_root, anchored_state_root);
}
other => panic!("expected anchored trie to survive stale checkout drop, got {other:?}"),
}
}
}

View File

@@ -84,7 +84,7 @@ where
Evm: ConfigureEvm<Primitives = N> + 'static,
{
/// Initializes the task with the given transactions pending execution
pub(crate) fn new(
pub fn new(
executor: Runtime,
execution_cache: PayloadExecutionCache,
ctx: PrewarmContext<N, P, Evm>,

View File

@@ -7,7 +7,7 @@ use crate::tree::{
dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage,
DEFAULT_MAX_TARGETS_FOR_CHUNKING,
},
payload_processor::{multiproof::MultiProofTaskMetrics, SparseTrieCheckout},
payload_processor::multiproof::MultiProofTaskMetrics,
};
use alloy_primitives::B256;
use alloy_rlp::{Decodable, Encodable};
@@ -30,7 +30,7 @@ use reth_trie_parallel::{
use reth_trie_sparse::debug_recorder::TrieDebugRecorder;
use reth_trie_sparse::{
errors::SparseTrieResult, ConfigurableSparseTrie, DeferredDrops, LeafUpdate,
RevealableSparseTrie,
RevealableSparseTrie, SparseStateTrie, SparseTrie,
};
use revm_primitives::{hash_map::Entry, B256Map};
use tracing::{debug, debug_span, error, instrument, trace_span};
@@ -39,7 +39,7 @@ use tracing::{debug, debug_span, error, instrument, trace_span};
const MAX_PENDING_UPDATES: usize = 100;
/// Sparse trie task implementation that uses in-memory sparse trie data to schedule proof fetching.
pub(super) struct SparseTrieCacheTask {
pub(super) struct SparseTrieCacheTask<A = ConfigurableSparseTrie, S = ConfigurableSparseTrie> {
/// Sender for proof results.
proof_result_tx: CrossbeamSender<ProofResultMessage>,
/// Receiver for proof results directly from workers.
@@ -47,7 +47,7 @@ pub(super) struct SparseTrieCacheTask {
/// Receives updates from execution and prewarming.
updates: CrossbeamReceiver<SparseTrieTaskMessage>,
/// `SparseStateTrie` used for computing the state root.
trie: SparseTrieCheckout,
trie: SparseStateTrie<A, S>,
/// Handle to the proof worker pools (storage and account).
proof_worker_handle: ProofWorkerHandle,
@@ -110,14 +110,18 @@ pub(super) struct SparseTrieCacheTask {
metrics: MultiProofTaskMetrics,
}
impl SparseTrieCacheTask {
impl<A, S> SparseTrieCacheTask<A, S>
where
A: SparseTrie + Default,
S: SparseTrie + Default + Clone,
{
/// Creates a new sparse trie, pre-populating with an existing [`SparseStateTrie`].
pub(super) fn new_with_checkout(
pub(super) fn new_with_trie(
executor: &Runtime,
updates: CrossbeamReceiver<MultiProofMessage>,
proof_worker_handle: ProofWorkerHandle,
metrics: MultiProofTaskMetrics,
trie: SparseTrieCheckout,
trie: SparseStateTrie<A, S>,
chunk_size: usize,
) -> Self {
let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
@@ -201,7 +205,7 @@ impl SparseTrieCacheTask {
max_values_capacity: usize,
disable_pruning: bool,
updates: &TrieUpdates,
) -> (SparseTrieCheckout, DeferredDrops) {
) -> (SparseStateTrie<A, S>, DeferredDrops) {
let Self { mut trie, .. } = self;
trie.commit_updates(updates);
if !disable_pruning {
@@ -220,7 +224,7 @@ impl SparseTrieCacheTask {
self,
max_nodes_capacity: usize,
max_values_capacity: usize,
) -> (SparseTrieCheckout, DeferredDrops) {
) -> (SparseStateTrie<A, S>, DeferredDrops) {
let Self { mut trie, .. } = self;
trie.clear();
trie.shrink_to(max_nodes_capacity, max_values_capacity);
@@ -302,9 +306,9 @@ impl SparseTrieCacheTask {
self.promote_pending_account_updates()?;
self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
if self.finished_state_updates
&& self.account_updates.is_empty()
&& self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
if self.finished_state_updates &&
self.account_updates.is_empty() &&
self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
{
break;
}
@@ -596,6 +600,7 @@ impl SparseTrieCacheTask {
Ok(updates_len_after < updates_len_before)
}
/// Computes storage roots for accounts whose storage updates are fully drained.
///
/// For each storage trie T that:
@@ -616,16 +621,16 @@ impl SparseTrieCacheTask {
.filter_map(|(address, updates)| updates.is_empty().then_some(*address))
.collect();
struct SendStorageTriePtr(*mut RevealableSparseTrie<ConfigurableSparseTrie>);
struct SendStorageTriePtr<S>(*mut RevealableSparseTrie<S>);
// SAFETY: this wrapper only forwards the pointer across rayon; deref invariants are
// documented at the use site below.
unsafe impl Send for SendStorageTriePtr {}
unsafe impl<S: Send> Send for SendStorageTriePtr<S> {}
let mut tries_to_compute_roots: Vec<(B256, SendStorageTriePtr)> =
let mut tries_to_compute_roots: Vec<(B256, SendStorageTriePtr<S>)> =
Vec::with_capacity(addresses_to_compute_roots.len());
for address in addresses_to_compute_roots {
if let Some(trie) = self.trie.storage_tries_mut().get_mut(&address)
&& !trie.is_root_cached()
if let Some(trie) = self.trie.storage_tries_mut().get_mut(&address) &&
!trie.is_root_cached()
{
tries_to_compute_roots.push((address, SendStorageTriePtr(trie)));
}
@@ -724,7 +729,7 @@ impl SparseTrieCacheTask {
// We need to keep iterating if any updates are being drained because that might
// indicate that more pending account updates can be promoted.
if num_promoted == 0 || !self.process_account_leaf_updates(false)? {
break;
break
}
}
@@ -845,6 +850,7 @@ pub struct StateRootComputeOutcome {
mod tests {
use super::*;
use alloy_primitives::{keccak256, Address, B256, U256};
use reth_trie_sparse::ArenaParallelSparseTrie;
#[test]
fn test_run_hashing_task_hashed_state_update_forwards() {
@@ -867,7 +873,10 @@ mod tests {
let expected_state = hashed_state.clone();
let handle = std::thread::spawn(move || {
SparseTrieCacheTask::run_hashing_task(updates_rx, hashed_state_tx);
SparseTrieCacheTask::<ArenaParallelSparseTrie, ArenaParallelSparseTrie>::run_hashing_task(
updates_rx,
hashed_state_tx,
);
});
updates_tx.send(MultiProofMessage::HashedStateUpdate(hashed_state)).unwrap();

View File

@@ -4,7 +4,7 @@ use crate::tree::{
cached_state::{CacheStats, CachedStateProvider},
error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError},
instrumented_state::{InstrumentedStateProvider, StateProviderStats},
payload_processor::{EngineSharedCaches, PayloadProcessor},
payload_processor::PayloadProcessor,
precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap},
sparse_trie::StateRootComputeOutcome,
CacheWaitDurations, EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle,
@@ -190,13 +190,16 @@ where
validator: V,
config: TreeConfig,
invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
shared_caches: EngineSharedCaches<Evm>,
changeset_cache: ChangesetCache,
runtime: reth_tasks::Runtime,
) -> Self {
let precompile_cache_map = shared_caches.precompile_cache_map();
let payload_processor =
PayloadProcessor::new(runtime.clone(), evm_config.clone(), &config, shared_caches);
let precompile_cache_map = PrecompileCacheMap::default();
let payload_processor = PayloadProcessor::new(
runtime.clone(),
evm_config.clone(),
&config,
precompile_cache_map.clone(),
);
Self {
provider,
consensus,
@@ -310,7 +313,7 @@ where
// Validate block consensus rules which includes header validation
if let Err(consensus_err) = self.validate_block_inner(&block, None) {
// Header validation error takes precedence over execution error
return Err(InsertBlockError::new(block, consensus_err.into()).into());
return Err(InsertBlockError::new(block, consensus_err.into()).into())
}
// Also validate against the parent
@@ -318,7 +321,7 @@ where
self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
{
// Parent validation error takes precedence over execution error
return Err(InsertBlockError::new(block, consensus_err.into()).into());
return Err(InsertBlockError::new(block, consensus_err.into()).into())
}
// No header validation errors, return the original execution error
@@ -393,7 +396,7 @@ where
Ok(val) => val,
Err(e) => {
let block = convert_to_block(input)?;
return Err(InsertBlockError::new(block, e.into()).into());
return Err(InsertBlockError::new(block, e.into()).into())
}
}
};
@@ -426,7 +429,7 @@ where
convert_to_block(input)?,
ProviderError::HeaderNotFound(parent_hash.into()).into(),
)
.into());
.into())
};
let mut state_provider = ensure_ok!(provider_builder.build());
drop(_enter);
@@ -439,7 +442,7 @@ where
convert_to_block(input)?,
ProviderError::HeaderNotFound(parent_hash.into()).into(),
)
.into());
.into())
};
let evm_env = debug_span!(target: "engine::tree::payload_validator", "evm_env")
@@ -759,7 +762,7 @@ where
)
.into(),
)
.into());
.into())
}
let timing_stats = state_provider_stats.map(|stats| {
@@ -821,14 +824,14 @@ where
) -> Result<(), ConsensusError> {
if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
error!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {}: {e}", block.hash());
return Err(e);
return Err(e)
}
if let Err(e) =
self.consensus.validate_block_pre_execution_with_tx_root(block, transaction_root)
{
error!(target: "engine::tree::payload_validator", ?block, "Failed to validate block {}: {e}", block.hash());
return Err(e);
return Err(e)
}
Ok(())
@@ -1320,7 +1323,7 @@ where
trace!(target: "engine::tree::payload_validator", block=?block.num_hash(), "Validating block consensus");
// validate block consensus rules
if let Err(e) = self.validate_block_inner(block, transaction_root) {
return Err(e.into());
return Err(e.into())
}
// now validate against the parent
@@ -1329,7 +1332,7 @@ where
self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
{
warn!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {} against parent: {e}", block.hash());
return Err(e.into());
return Err(e.into())
}
drop(_enter);
@@ -1342,7 +1345,7 @@ where
{
// call post-block hook
self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
return Err(err.into());
return Err(err.into())
}
drop(_enter);
@@ -1358,7 +1361,7 @@ where
{
// call post-block hook
self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
return Err(err.into());
return Err(err.into())
}
// record post-execution validation duration
@@ -1466,7 +1469,7 @@ where
self.provider.clone(),
historical,
Some(blocks),
)));
)))
}
// Check if the block is persisted
@@ -1474,7 +1477,7 @@ where
debug!(target: "engine::tree::payload_validator", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
// For persisted blocks, we create a builder that will fetch state directly from the
// database
return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)));
return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
}
debug!(target: "engine::tree::payload_validator", %hash, "no canonical state found for block");
@@ -1506,7 +1509,7 @@ where
) {
if state.invalid_headers.get(&block.hash()).is_some() {
// we already marked this block as invalid
return;
return
}
self.invalid_block_hook.on_invalid_block(parent_header, block, output, trie_updates);
}

View File

@@ -203,7 +203,6 @@ impl TestHarness {
payload_validator,
TreeConfig::default(),
Box::new(NoopInvalidBlockHook::default()),
EngineSharedCaches::default(),
changeset_cache.clone(),
reth_tasks::Runtime::test(),
);
@@ -408,7 +407,6 @@ impl ValidatorTestHarness {
payload_validator,
TreeConfig::default(),
Box::new(NoopInvalidBlockHook::default()),
EngineSharedCaches::default(),
changeset_cache,
reth_tasks::Runtime::test(),
);

View File

@@ -1,27 +0,0 @@
//! SDK smoke tests for `EngineSharedCaches`.
use alloy_primitives::B256;
use reth_engine_tree::tree::{
EngineSharedCaches, PayloadSparseTrieKind, PayloadSparseTrieStoreOutcome,
};
use reth_evm_ethereum::EthEvmConfig;
#[test]
fn engine_shared_caches_exposes_public_sparse_trie_sdk() {
let caches =
EngineSharedCaches::<EthEvmConfig>::with_sparse_trie_kind(PayloadSparseTrieKind::Arena);
let _precompile_cache_map = caches.precompile_cache_map();
let sparse_trie_cache = caches.sparse_trie_cache();
assert_eq!(sparse_trie_cache.kind(), PayloadSparseTrieKind::Arena);
let state_root = B256::with_last_byte(1);
assert_eq!(
sparse_trie_cache.take_or_create_for(state_root).store_anchored(state_root),
PayloadSparseTrieStoreOutcome::Stored
);
let checkout = sparse_trie_cache.take_or_create_for(state_root);
assert!(checkout.memory_size() > 0 || checkout.retained_storage_tries_count() == 0);
}

View File

@@ -36,6 +36,8 @@ tracing.workspace = true
tempfile.workspace = true
[features]
default = []
otlp = ["reth-tracing/otlp", "reth-node-core/otlp"]
otlp-logs = ["reth-tracing/otlp-logs", "reth-node-core/otlp-logs"]
@@ -87,3 +89,6 @@ min-trace-logs = [
"tracing/release_max_level_trace",
"reth-node-core/min-trace-logs",
]
rocksdb = ["reth-cli-commands/rocksdb"]
edge = ["rocksdb"]

View File

@@ -85,7 +85,4 @@ serde = [
"alloy-rpc-types-eth?/serde",
"rand/serde",
]
rpc = [
"dep:alloy-rpc-types-eth",
"alloy-rpc-types-eth?/serde",
]
rpc = ["dep:alloy-rpc-types-eth"]

View File

@@ -66,6 +66,8 @@ secp256k1.workspace = true
tempfile.workspace = true
[features]
default = []
edge = ["reth-provider/edge"]
serde = [
"reth-exex-types/serde",
"reth-revm/serde",

View File

@@ -63,28 +63,6 @@ impl EthStreamError {
}
}
/// Returns whether this error indicates a protocol breach on the receive side.
///
/// These are errors caused by the remote peer sending invalid or malformed data
/// that warrant disconnecting with [`DisconnectReason::ProtocolBreach`].
pub const fn is_protocol_breach(&self) -> bool {
matches!(
self,
Self::InvalidMessage(_) |
Self::MessageTooBig(_) |
Self::TransactionHashesInvalidLenOfFields { .. } |
Self::UnsupportedMessage { .. } |
Self::P2PStreamError(
P2PStreamError::Rlp(_) |
P2PStreamError::Snap(_) |
P2PStreamError::MessageTooBig { .. } |
P2PStreamError::UnknownReservedMessageId(_) |
P2PStreamError::EmptyProtocolMessage |
P2PStreamError::UnknownDisconnectReason(_)
)
)
}
/// Returns the [`io::Error`] if it was caused by IO
pub const fn as_io(&self) -> Option<&io::Error> {
if let Self::P2PStreamError(P2PStreamError::Io(io)) = self {

View File

@@ -21,7 +21,7 @@ alloy-eip2124.workspace = true
# misc
serde = { workspace = true, optional = true }
humantime-serde = { workspace = true, optional = true }
serde_json = { workspace = true, features = ["std"], optional = true }
serde_json = { workspace = true, features = ["std"] }
# misc
tracing.workspace = true
@@ -30,7 +30,6 @@ tracing.workspace = true
serde = [
"dep:serde",
"dep:humantime-serde",
"dep:serde_json",
"alloy-eip2124/serde",
]
test-utils = []

View File

@@ -1,9 +1,15 @@
//! Configuration for peering.
use std::{collections::HashSet, time::Duration};
use std::{
collections::HashSet,
io::{self, ErrorKind},
path::Path,
time::Duration,
};
use reth_net_banlist::{BanList, IpFilter};
use reth_network_peers::{NodeRecord, TrustedPeer};
use tracing::info;
use crate::{peers::PersistedPeerInfo, BackoffKind, ReputationChangeWeights};
@@ -305,16 +311,16 @@ impl PeersConfig {
#[cfg(feature = "serde")]
pub fn with_basic_nodes_from_file(
mut self,
optional_file: Option<impl AsRef<std::path::Path>>,
) -> Result<Self, std::io::Error> {
optional_file: Option<impl AsRef<Path>>,
) -> Result<Self, io::Error> {
let Some(file_path) = optional_file else { return Ok(self) };
let raw = match std::fs::read_to_string(file_path.as_ref()) {
Ok(contents) => contents,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(self),
Err(e) if e.kind() == ErrorKind::NotFound => return Ok(self),
Err(e) => return Err(e),
};
tracing::info!(target: "net::peers", file = %file_path.as_ref().display(), "Loading saved peers");
info!(target: "net::peers", file = %file_path.as_ref().display(), "Loading saved peers");
// Try the new format first, fall back to legacy Vec<NodeRecord>
let peers: Vec<PersistedPeerInfo> = serde_json::from_str(&raw)
@@ -324,9 +330,9 @@ impl PeersConfig {
nodes.into_iter().map(PersistedPeerInfo::from_node_record).collect(),
)
})
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
tracing::info!(target: "net::peers", count = peers.len(), "Loaded persisted peers");
info!(target: "net::peers", count = peers.len(), "Loaded persisted peers");
self.persisted_peers = peers;
Ok(self)
}

View File

@@ -742,9 +742,7 @@ impl<N: NetworkPrimitives> Future for ActiveSession<N> {
}
OnIncomingMessageOutcome::BadMessage { error, message } => {
debug!(target: "net::session", %error, msg=?message, remote_peer_id=?this.remote_peer_id, "received invalid protocol message");
this.on_bad_message();
return this
.try_disconnect(DisconnectReason::ProtocolBreach, cx)
return this.close_on_error(error, cx)
}
OnIncomingMessageOutcome::NoCapacity(msg) => {
// failed to send due to lack of capacity
@@ -754,10 +752,6 @@ impl<N: NetworkPrimitives> Future for ActiveSession<N> {
}
Err(err) => {
debug!(target: "net::session", %err, remote_peer_id=?this.remote_peer_id, "failed to receive message");
if err.is_protocol_breach() {
this.on_bad_message();
return this.try_disconnect(DisconnectReason::ProtocolBreach, cx)
}
return this.close_on_error(err, cx)
}
}
@@ -972,7 +966,6 @@ mod tests {
GetBlockBodies, HelloMessageWithProtocols, P2PStream, StatusBuilder, UnauthedEthStream,
UnauthedP2PStream, UnifiedStatus,
};
use reth_eth_wire_types::{EthMessageID, RawCapabilityMessage};
use reth_ethereum_forks::EthereumHardfork;
use reth_network_peers::pk2id;
use reth_network_types::session::config::PROTOCOL_BREACH_REQUEST_TIMEOUT;
@@ -1168,40 +1161,6 @@ mod tests {
fut.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_invalid_message_disconnects_with_protocol_breach() {
let mut builder = SessionBuilder::default();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let local_addr = listener.local_addr().unwrap();
let fut = builder.with_client_stream(local_addr, async move |mut client_stream| {
client_stream
.start_send_raw(RawCapabilityMessage::eth(
EthMessageID::PooledTransactions,
vec![0xc0].into(),
))
.unwrap();
client_stream.flush().await.unwrap();
let msg = client_stream.next().await.unwrap().unwrap_err();
assert_eq!(msg.as_disconnected(), Some(DisconnectReason::ProtocolBreach));
});
let (tx, rx) = oneshot::channel();
tokio::task::spawn(async move {
let (incoming, _) = listener.accept().await.unwrap();
let session = builder.connect_incoming(incoming).await;
session.await;
tx.send(()).unwrap();
});
fut.await;
rx.await.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn handle_dropped_stream() {
let mut builder = SessionBuilder::default();

View File

@@ -442,12 +442,8 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
search_durations.find_idle_peer
);
// peer may have disconnected between idle check and here, re-buffer hashes so they
// aren't lost from the pending fetch cache
let Some(peer) = peers.get(&peer_id) else {
self.buffer_hashes(hashes_to_request, None);
return false
};
// peer should always exist since `is_session_active` already checked
let Some(peer) = peers.get(&peer_id) else { return false };
let conn_eth_version = peer.version;
// fill the request with more hashes pending fetch that have been announced by the peer.
@@ -1453,26 +1449,6 @@ mod test {
)
}
#[test]
fn on_fetch_pending_hashes_rebuffers_on_disconnected_peer() {
let tx_fetcher = &mut TransactionFetcher::default();
let peer_1 = PeerId::new([1; 64]);
let peer_2 = PeerId::new([2; 64]);
let hash_1 = B256::from_slice(&[1; 32]);
buffer_hash_to_tx_fetcher(tx_fetcher, hash_1, peer_1, 0, Some(128));
buffer_hash_to_tx_fetcher(tx_fetcher, hash_1, peer_2, 0, Some(128));
assert_eq!(tx_fetcher.num_pending_hashes(), 1);
// pass empty peers map — both peers are "disconnected"
let peers = HashMap::new();
tx_fetcher.on_fetch_pending_hashes(&peers, |_| true);
// hash should be re-buffered, not lost
assert_eq!(tx_fetcher.num_pending_hashes(), 1);
}
#[test]
fn verify_response_hashes() {
let input = hex!(

View File

@@ -460,14 +460,6 @@ impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
}
/// Handles a closed peer session, removing the peer from transaction-local tracking state.
fn on_peer_session_closed(&mut self, peer_id: &PeerId) {
if let Some(mut peer) = self.peers.remove(peer_id) {
self.policies.propagation_policy_mut().on_session_closed(&mut peer);
}
self.transaction_fetcher.remove_peer(peer_id);
}
/// Clear the transaction
fn on_good_import(&mut self, hash: TxHash) {
self.transactions_by_peers.remove(&hash);
@@ -1254,7 +1246,13 @@ where
fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
match event_result {
NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
self.on_peer_session_closed(&peer_id);
// remove the peer
let peer = self.peers.remove(&peer_id);
if let Some(mut peer) = peer {
self.policies.propagation_policy_mut().on_session_closed(&mut peer);
}
self.transaction_fetcher.remove_peer(&peer_id);
}
NetworkEvent::ActivePeerSession { info, messages } => {
// process active peer session and broadcast available transaction from the pool
@@ -2169,7 +2167,7 @@ mod tests {
NetworkConfigBuilder, NetworkManager,
};
use alloy_consensus::{TxEip1559, TxLegacy};
use alloy_primitives::{hex, Signature, TxKind, B256, U256};
use alloy_primitives::{hex, Signature, TxKind, U256};
use alloy_rlp::Decodable;
use futures::FutureExt;
use reth_chainspec::MIN_TRANSACTION_GAS;
@@ -2513,46 +2511,6 @@ mod tests {
handle.terminate().await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_session_closed_cleans_transaction_peer_state() {
let (mut tx_manager, _network) = new_tx_manager().await;
let peer_id = PeerId::new([1; 64]);
let fallback_peer = PeerId::new([2; 64]);
let (peer, _) = new_mock_session(peer_id, EthVersion::Eth66);
let hash_shared = B256::from_slice(&[1; 32]);
tx_manager.peers.insert(peer_id, peer);
buffer_hash_to_tx_fetcher(
&mut tx_manager.transaction_fetcher,
hash_shared,
peer_id,
0,
None,
);
buffer_hash_to_tx_fetcher(
&mut tx_manager.transaction_fetcher,
hash_shared,
fallback_peer,
0,
None,
);
tx_manager.transaction_fetcher.active_peers.insert(peer_id, 1);
tx_manager.on_network_event(NetworkEvent::Peer(PeerEvent::SessionClosed {
peer_id,
reason: None,
}));
// peer removed from peers map and active_peers
assert!(!tx_manager.peers.contains_key(&peer_id));
assert!(tx_manager.transaction_fetcher.active_peers.peek(&peer_id).is_none());
// fallback peer is still available for the hash
assert_eq!(
tx_manager.transaction_fetcher.get_idle_peer_for(hash_shared),
Some(&fallback_peer)
);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_on_get_pooled_transactions_network() {
reth_tracing::init_test_tracing();

View File

@@ -32,7 +32,7 @@ use reth_node_core::{
primitives::Head,
};
use reth_provider::{
providers::{BlockchainProvider, NodeTypesForProvider, RocksDBProvider},
providers::{BlockchainProvider, NodeTypesForProvider},
ChainSpecProvider, FullProvider,
};
use reth_tasks::TaskExecutor;
@@ -154,14 +154,12 @@ pub struct NodeBuilder<DB, ChainSpec> {
config: NodeConfig<ChainSpec>,
/// The configured database for the node.
database: DB,
/// An optional [`RocksDBProvider`] to use instead of creating one during launch.
rocksdb_provider: Option<RocksDBProvider>,
}
impl<ChainSpec> NodeBuilder<(), ChainSpec> {
/// Create a new [`NodeBuilder`].
pub const fn new(config: NodeConfig<ChainSpec>) -> Self {
Self { config, database: (), rocksdb_provider: None }
Self { config, database: () }
}
}
@@ -230,13 +228,7 @@ impl<DB, ChainSpec> NodeBuilder<DB, ChainSpec> {
impl<DB, ChainSpec: EthChainSpec> NodeBuilder<DB, ChainSpec> {
/// Configures the underlying database that the node will use.
pub fn with_database<D>(self, database: D) -> NodeBuilder<D, ChainSpec> {
NodeBuilder { config: self.config, database, rocksdb_provider: self.rocksdb_provider }
}
/// Sets the [`RocksDBProvider`] to use instead of creating one during launch.
pub fn with_rocksdb_provider(mut self, rocksdb_provider: RocksDBProvider) -> Self {
self.rocksdb_provider = Some(rocksdb_provider);
self
NodeBuilder { config: self.config, database }
}
/// Preconfigure the builder with the context to launch the node.
@@ -305,7 +297,7 @@ where
T: NodeTypesForProvider<ChainSpec = ChainSpec>,
P: FullProvider<NodeTypesWithDBAdapter<T, DB>>,
{
NodeBuilderWithTypes::new(self.config, self.database, self.rocksdb_provider)
NodeBuilderWithTypes::new(self.config, self.database)
}
/// Preconfigures the node with a specific node implementation.
@@ -355,12 +347,6 @@ where
DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
ChainSpec: EthChainSpec + EthereumHardforks,
{
/// Sets the [`RocksDBProvider`] to use instead of creating one during launch.
pub fn with_rocksdb_provider(mut self, rocksdb_provider: RocksDBProvider) -> Self {
self.builder.rocksdb_provider = Some(rocksdb_provider);
self
}
/// Configures the types of the node.
pub fn with_types<T>(self) -> WithLaunchContext<NodeBuilderWithTypes<RethFullAdapter<DB, T>>>
where

View File

@@ -16,7 +16,6 @@ use crate::{
use reth_exex::ExExContext;
use reth_node_api::{FullNodeComponents, FullNodeTypes, NodeAddOns, NodeTypes};
use reth_node_core::node_config::NodeConfig;
use reth_provider::providers::RocksDBProvider;
use reth_tasks::TaskExecutor;
use std::{fmt, fmt::Debug, future::Future};
@@ -26,8 +25,6 @@ pub struct NodeBuilderWithTypes<T: FullNodeTypes> {
config: NodeConfig<<T::Types as NodeTypes>::ChainSpec>,
/// The configured database for the node.
adapter: NodeTypesAdapter<T>,
/// An optional [`RocksDBProvider`] to use instead of creating one during launch.
rocksdb_provider: Option<RocksDBProvider>,
}
impl<T: FullNodeTypes> NodeBuilderWithTypes<T> {
@@ -35,9 +32,8 @@ impl<T: FullNodeTypes> NodeBuilderWithTypes<T> {
pub const fn new(
config: NodeConfig<<T::Types as NodeTypes>::ChainSpec>,
database: T::DB,
rocksdb_provider: Option<RocksDBProvider>,
) -> Self {
Self { config, adapter: NodeTypesAdapter::new(database), rocksdb_provider }
Self { config, adapter: NodeTypesAdapter::new(database) }
}
/// Advances the state of the node builder to the next state where all components are configured
@@ -45,12 +41,11 @@ impl<T: FullNodeTypes> NodeBuilderWithTypes<T> {
where
CB: NodeComponentsBuilder<T>,
{
let Self { config, adapter, rocksdb_provider } = self;
let Self { config, adapter } = self;
NodeBuilderWithComponents {
config,
adapter,
rocksdb_provider,
components_builder,
add_ons: AddOns { hooks: NodeHooks::default(), exexs: Vec::new(), add_ons: () },
}
@@ -155,8 +150,6 @@ pub struct NodeBuilderWithComponents<
pub config: NodeConfig<<T::Types as NodeTypes>::ChainSpec>,
/// Adapter for the underlying node types and database
pub adapter: NodeTypesAdapter<T>,
/// An optional [`RocksDBProvider`] to use instead of creating one during launch.
pub rocksdb_provider: Option<RocksDBProvider>,
/// container for type specific components
pub components_builder: CB,
/// Additional node extensions.
@@ -174,12 +167,11 @@ where
where
AO: NodeAddOns<NodeAdapter<T, CB::Components>>,
{
let Self { config, adapter, rocksdb_provider, components_builder, .. } = self;
let Self { config, adapter, components_builder, .. } = self;
NodeBuilderWithComponents {
config,
adapter,
rocksdb_provider,
components_builder,
add_ons: AddOns { hooks: NodeHooks::default(), exexs: Vec::new(), add_ons },
}

View File

@@ -472,7 +472,6 @@ where
pub async fn create_provider_factory<N, Evm>(
&self,
changeset_cache: ChangesetCache,
rocksdb_provider: Option<RocksDBProvider>,
) -> eyre::Result<ProviderFactory<N>>
where
N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
@@ -490,16 +489,12 @@ where
.with_genesis_block_number(self.chain_spec().genesis().number.unwrap_or_default())
.build()?;
// Use the provided RocksDB provider or create a new one
let rocksdb_provider = if let Some(provider) = rocksdb_provider {
provider
} else {
RocksDBProvider::builder(self.data_dir().rocksdb())
.with_default_tables()
.with_metrics()
.with_statistics()
.build()?
};
// Initialize RocksDB provider with metrics, statistics, and default tables
let rocksdb_provider = RocksDBProvider::builder(self.data_dir().rocksdb())
.with_default_tables()
.with_metrics()
.with_statistics()
.build()?;
let factory = ProviderFactory::new(
self.right().clone(),
@@ -578,14 +573,12 @@ where
pub async fn with_provider_factory<N, Evm>(
self,
changeset_cache: ChangesetCache,
rocksdb_provider: Option<RocksDBProvider>,
) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
where
N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
{
let factory =
self.create_provider_factory::<N, Evm>(changeset_cache, rocksdb_provider).await?;
let factory = self.create_provider_factory::<N, Evm>(changeset_cache).await?;
let ctx = LaunchContextWith {
inner: self.inner,
attachment: self.attachment.map_right(|_| factory),

View File

@@ -15,7 +15,7 @@ use reth_engine_tree::{
chain::{ChainEvent, FromOrchestrator},
engine::{EngineApiKind, EngineApiRequest, EngineRequestHandler},
launch::build_engine_orchestrator,
tree::{EngineSharedCaches, PayloadSparseTrieKind, TreeConfig},
tree::TreeConfig,
};
use reth_engine_util::EngineMessageStreamExt;
use reth_exex::ExExManagerHandle;
@@ -81,7 +81,6 @@ impl EngineNodeLauncher {
let Self { ctx, engine_tree_config } = self;
let NodeBuilderWithComponents {
adapter: NodeTypesAdapter { database },
rocksdb_provider,
components_builder,
add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
config,
@@ -90,10 +89,6 @@ impl EngineNodeLauncher {
// Create changeset cache that will be shared across the engine
let changeset_cache = ChangesetCache::new();
let main_shared_caches =
EngineSharedCaches::<<CB::Components as NodeComponents<T>>::Evm>::with_sparse_trie_kind(
PayloadSparseTrieKind::from(engine_tree_config.enable_arena_sparse_trie()),
);
// setup the launch context
let ctx = ctx
@@ -107,7 +102,7 @@ impl EngineNodeLauncher {
// ensure certain settings take effect
.with_adjusted_configs()
// Create the provider factory with changeset cache
.with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>(changeset_cache.clone(), rocksdb_provider).await?
.with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>(changeset_cache.clone()).await?
.inspect(|_| {
info!(target: "reth::cli", "Database opened");
})
@@ -127,8 +122,7 @@ impl EngineNodeLauncher {
.with_blockchain_db::<T, _>(move |provider_factory| {
Ok(BlockchainProvider::new(provider_factory)?)
})?
.with_components(components_builder, on_component_initialized)
.await?;
.with_components(components_builder, on_component_initialized).await?;
// spawn exexs if any
let maybe_exex_manager_handle = ctx.launch_exex(installed_exex).await?;
@@ -199,12 +193,7 @@ impl EngineNodeLauncher {
// Build the engine validator with all required components
let engine_validator = validator_builder
.clone()
.build_tree_validator_with_caches(
&add_ons_ctx,
engine_tree_config.clone(),
changeset_cache.clone(),
main_shared_caches.clone(),
)
.build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), changeset_cache.clone())
.await?;
// Create the consensus engine stream with optional reorg
@@ -217,18 +206,8 @@ impl EngineNodeLauncher {
|| async {
// Create a separate cache for reorg validator (not shared with main engine)
let reorg_cache = ChangesetCache::new();
let reorg_shared_caches = EngineSharedCaches::<
<CB::Components as NodeComponents<T>>::Evm,
>::with_sparse_trie_kind(
PayloadSparseTrieKind::from(engine_tree_config.enable_arena_sparse_trie()),
);
validator_builder
.build_tree_validator_with_caches(
&add_ons_ctx,
engine_tree_config.clone(),
reorg_cache,
reorg_shared_caches,
)
.build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), reorg_cache)
.await
},
node_config.debug.reorg_frequency,

View File

@@ -1,8 +1,8 @@
//! Builder support for rpc components.
pub use jsonrpsee::server::middleware::rpc::{RpcService, RpcServiceBuilder};
use reth_engine_tree::tree::WaitForCaches;
pub use reth_engine_tree::tree::{BasicEngineValidator, EngineValidator};
use reth_engine_tree::tree::{EngineSharedCaches, PayloadSparseTrieKind, WaitForCaches};
pub use reth_rpc_builder::{middleware::RethRpcMiddleware, Identity, Stack};
pub use reth_trie_db::ChangesetCache;
@@ -981,8 +981,7 @@ where
let Self { eth_api_builder, engine_api_builder, hooks, .. } = self;
let engine_api = engine_api_builder.build_engine_api(&ctx).await?;
let AddOnsContext { node, config, beacon_engine_handle, jwt_secret, engine_events, .. } =
ctx;
let AddOnsContext { node, config, beacon_engine_handle, jwt_secret, engine_events } = ctx;
info!(target: "reth::cli", "Engine API handler initialized");
@@ -1295,25 +1294,6 @@ pub trait EngineValidatorBuilder<Node: FullNodeComponents>: Send + Sync + Clone
tree_config: TreeConfig,
changeset_cache: ChangesetCache,
) -> impl Future<Output = eyre::Result<Self::EngineValidator>> + Send;
/// Builds the tree validator using the shared cache handles exported by the launcher.
///
/// The default implementation preserves the legacy behavior and ignores the provided caches.
fn build_tree_validator_with_caches(
self,
ctx: &AddOnsContext<'_, Node>,
tree_config: TreeConfig,
changeset_cache: ChangesetCache,
shared_caches: EngineSharedCaches<Node::Evm>,
) -> impl Future<Output = eyre::Result<Self::EngineValidator>> + Send
where
Self: Sized,
{
async move {
let _ = shared_caches;
self.build_tree_validator(ctx, tree_config, changeset_cache).await
}
}
}
/// Basic implementation of [`EngineValidatorBuilder`].
@@ -1361,20 +1341,6 @@ where
ctx: &AddOnsContext<'_, Node>,
tree_config: TreeConfig,
changeset_cache: ChangesetCache,
) -> eyre::Result<Self::EngineValidator> {
let shared_caches = EngineSharedCaches::with_sparse_trie_kind(PayloadSparseTrieKind::from(
tree_config.enable_arena_sparse_trie(),
));
self.build_tree_validator_with_caches(ctx, tree_config, changeset_cache, shared_caches)
.await
}
async fn build_tree_validator_with_caches(
self,
ctx: &AddOnsContext<'_, Node>,
tree_config: TreeConfig,
changeset_cache: ChangesetCache,
shared_caches: EngineSharedCaches<Node::Evm>,
) -> eyre::Result<Self::EngineValidator> {
let validator = self.payload_validator_builder.build(ctx).await?;
let data_dir = ctx.config.datadir.clone().resolve_datadir(ctx.config.chain.chain());
@@ -1387,7 +1353,6 @@ where
validator,
tree_config,
invalid_block_hook,
shared_caches,
changeset_cache,
ctx.node.task_executor().clone(),
))

View File

@@ -93,6 +93,12 @@ min-trace-logs = ["tracing/release_max_level_trace"]
# Debug recording for sparse trie mutations
trie-debug = ["reth-engine-primitives/trie-debug"]
# Route supported tables to RocksDB instead of MDBX
rocksdb = ["reth-storage-api/rocksdb"]
# Marker feature for edge/unstable builds - enables rocksdb and sets v2 defaults
edge = ["rocksdb"]
[build-dependencies]
vergen = { workspace = true, features = ["build", "cargo", "emit_and_set"] }
vergen-git2.workspace = true

View File

@@ -42,10 +42,13 @@ rayon.workspace = true
tokio.workspace = true
rustc-hash.workspace = true
[features]
rocksdb = ["reth-provider/rocksdb"]
[dev-dependencies]
# reth
reth-db = { workspace = true, features = ["test-utils"] }
reth-stages = { workspace = true, features = ["test-utils"] }
reth-stages = { workspace = true, features = ["test-utils", "rocksdb"] }
reth-primitives-traits = { workspace = true, features = ["arbitrary"] }
reth-testing-utils.workspace = true
reth-tracing.workspace = true

View File

@@ -74,6 +74,7 @@ where
let range_end = *range.end();
// Check where account history indices are stored
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storage_v2 {
return self.prune_rocksdb(provider, input, range, range_end);
}
@@ -231,6 +232,7 @@ impl AccountHistory {
///
/// Reads account changesets from static files and prunes the corresponding
/// `RocksDB` history shards.
#[cfg(all(unix, feature = "rocksdb"))]
fn prune_rocksdb<Provider>(
&self,
provider: &Provider,
@@ -504,6 +506,157 @@ mod tests {
test_prune(1400, 3, (PruneProgress::Finished, 804));
}
/// Tests the `prune_static_files` code path. On unix with rocksdb feature, v2 storage
/// routes to `prune_rocksdb` instead, so this test only runs without rocksdb (the
/// `prune_rocksdb_path` test covers that configuration).
#[test]
#[cfg(not(all(unix, feature = "rocksdb")))]
fn prune_static_file() {
let db = TestStageDB::default();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=5000,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
let (changesets, _) = random_changeset_range(
&mut rng,
blocks.iter(),
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
0..0,
0..0,
);
db.insert_changesets_to_static_files(changesets.clone(), None)
.expect("insert changesets to static files");
db.insert_history(changesets.clone(), None).expect("insert history");
let account_occurrences = db.table::<tables::AccountsHistory>().unwrap().into_iter().fold(
BTreeMap::<_, usize>::new(),
|mut map, (key, _)| {
map.entry(key.key).or_default().add_assign(1);
map
},
);
assert!(account_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
let original_shards = db.table::<tables::AccountsHistory>().unwrap();
let test_prune =
|to_block: BlockNumber, run: usize, expected_result: (PruneProgress, usize)| {
let prune_mode = PruneMode::Before(to_block);
let deleted_entries_limit = 2000;
let mut limiter =
PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
let input = PruneInput {
previous_checkpoint: db
.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::AccountHistory)
.unwrap(),
to_block,
limiter: limiter.clone(),
};
let segment = AccountHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(StorageSettings::v2());
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
assert_matches!(
result,
SegmentOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);
segment
.save_checkpoint(
&provider,
result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
)
.unwrap();
provider.commit().expect("commit");
let changesets = changesets
.iter()
.enumerate()
.flat_map(|(block_number, changeset)| {
changeset.iter().map(move |change| (block_number, change))
})
.collect::<Vec<_>>();
#[expect(clippy::skip_while_next)]
let pruned = changesets
.iter()
.enumerate()
.skip_while(|(i, (block_number, _))| {
*i < deleted_entries_limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE * run &&
*block_number <= to_block as usize
})
.next()
.map(|(i, _)| i)
.unwrap_or_default();
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
let last_pruned_block_number = pruned_changesets
.next()
.map(|(block_number, _)| {
(if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
}) as BlockNumber
})
.unwrap_or(to_block);
let actual_shards = db.table::<tables::AccountsHistory>().unwrap();
let expected_shards = original_shards
.iter()
.filter(|(key, _)| key.highest_block_number > last_pruned_block_number)
.map(|(key, blocks)| {
let new_blocks =
blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
})
.collect::<Vec<_>>();
assert_eq!(actual_shards, expected_shards);
assert_eq!(
db.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::AccountHistory)
.unwrap(),
Some(PruneCheckpoint {
block_number: Some(last_pruned_block_number),
tx_number: None,
prune_mode
})
);
};
test_prune(
998,
1,
(PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 1000),
);
test_prune(998, 2, (PruneProgress::Finished, 1000));
test_prune(1400, 3, (PruneProgress::Finished, 804));
}
#[cfg(all(unix, feature = "rocksdb"))]
#[test]
fn prune_rocksdb_path() {
use reth_db_api::models::ShardedKey;

View File

@@ -75,6 +75,7 @@ where
let range_end = *range.end();
// Check where storage history indices are stored
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storage_v2 {
return self.prune_rocksdb(provider, input, range, range_end);
}
@@ -235,6 +236,7 @@ impl StorageHistory {
///
/// Reads storage changesets from static files and prunes the corresponding
/// `RocksDB` history shards.
#[cfg(all(unix, feature = "rocksdb"))]
fn prune_rocksdb<Provider>(
&self,
provider: &Provider,
@@ -516,6 +518,159 @@ mod tests {
test_prune(1200, 3, (PruneProgress::Finished, 202));
}
/// Tests the `prune_static_files` code path. On unix with rocksdb feature, v2 storage
/// routes to `prune_rocksdb` instead, so this test only runs without rocksdb (the
/// `prune_rocksdb_path` test covers that configuration).
#[test]
#[cfg(not(all(unix, feature = "rocksdb")))]
fn prune_static_file() {
let db = TestStageDB::default();
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
0..=5000,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
let (changesets, _) = random_changeset_range(
&mut rng,
blocks.iter(),
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
1..2,
1..2,
);
db.insert_changesets_to_static_files(changesets.clone(), None)
.expect("insert changesets to static files");
db.insert_history(changesets.clone(), None).expect("insert history");
let storage_occurrences = db.table::<tables::StoragesHistory>().unwrap().into_iter().fold(
BTreeMap::<_, usize>::new(),
|mut map, (key, _)| {
map.entry((key.address, key.sharded_key.key)).or_default().add_assign(1);
map
},
);
assert!(storage_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
let original_shards = db.table::<tables::StoragesHistory>().unwrap();
let test_prune = |to_block: BlockNumber,
run: usize,
expected_result: (PruneProgress, usize)| {
let prune_mode = PruneMode::Before(to_block);
let deleted_entries_limit = 1000;
let mut limiter =
PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
let input = PruneInput {
previous_checkpoint: db
.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::StorageHistory)
.unwrap(),
to_block,
limiter: limiter.clone(),
};
let segment = StorageHistory::new(prune_mode);
let provider = db.factory.database_provider_rw().unwrap();
provider.set_storage_settings_cache(StorageSettings::v2());
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
assert_matches!(
result,
SegmentOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);
segment
.save_checkpoint(
&provider,
result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
)
.unwrap();
provider.commit().expect("commit");
let changesets = changesets
.iter()
.enumerate()
.flat_map(|(block_number, changeset)| {
changeset.iter().flat_map(move |(address, _, entries)| {
entries.iter().map(move |entry| (block_number, address, entry))
})
})
.collect::<Vec<_>>();
#[expect(clippy::skip_while_next)]
let pruned = changesets
.iter()
.enumerate()
.skip_while(|(i, (block_number, _, _))| {
*i < deleted_entries_limit / STORAGE_HISTORY_TABLES_TO_PRUNE * run &&
*block_number <= to_block as usize
})
.next()
.map(|(i, _)| i)
.unwrap_or_default();
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
let last_pruned_block_number = pruned_changesets
.next()
.map(|(block_number, _, _)| {
(if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
}) as BlockNumber
})
.unwrap_or(to_block);
let actual_shards = db.table::<tables::StoragesHistory>().unwrap();
let expected_shards = original_shards
.iter()
.filter(|(key, _)| key.sharded_key.highest_block_number > last_pruned_block_number)
.map(|(key, blocks)| {
let new_blocks =
blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
})
.collect::<Vec<_>>();
assert_eq!(actual_shards, expected_shards);
assert_eq!(
db.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::StorageHistory)
.unwrap(),
Some(PruneCheckpoint {
block_number: Some(last_pruned_block_number),
tx_number: None,
prune_mode
})
);
};
test_prune(
998,
1,
(PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 500),
);
test_prune(998, 2, (PruneProgress::Finished, 500));
test_prune(1200, 3, (PruneProgress::Finished, 202));
}
/// Tests that when a limiter stops mid-block (with multiple storage changes for the same
/// block), the checkpoint is set to `block_number - 1` to avoid dangling index entries.
#[test]
@@ -666,6 +821,7 @@ mod tests {
assert!(final_changesets.is_empty(), "All changesets up to block 10 should be pruned");
}
#[cfg(all(unix, feature = "rocksdb"))]
#[test]
fn prune_rocksdb() {
use reth_db_api::models::storage_sharded_key::StorageShardedKey;

View File

@@ -95,6 +95,7 @@ where
.into_inner();
// Check where transaction hash numbers are stored
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storage_v2 {
return self.prune_rocksdb(provider, input, start, end);
}
@@ -195,6 +196,7 @@ impl TransactionLookup {
///
/// Reads transactions from static files and deletes corresponding entries
/// from the `RocksDB` `TransactionHashNumbers` table.
#[cfg(all(unix, feature = "rocksdb"))]
fn prune_rocksdb<Provider>(
&self,
provider: &Provider,
@@ -436,6 +438,7 @@ mod tests {
test_prune(10, (PruneProgress::Finished, 8));
}
#[cfg(all(unix, feature = "rocksdb"))]
#[test]
fn prune_rocksdb() {
use reth_db_api::models::StorageSettings;
@@ -536,6 +539,7 @@ mod tests {
/// 1. Some transactions have already been pruned (checkpoint at tx 5)
/// 2. The deleted entries limit is exhausted before any new deletions
/// 3. The checkpoint should NOT advance to the next start position
#[cfg(all(unix, feature = "rocksdb"))]
#[test]
fn prune_rocksdb_zero_deleted_checkpoint() {
use reth_db_api::models::StorageSettings;

View File

@@ -491,10 +491,6 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
// Disabled because eth_createAccessList is sometimes used with non-eoa senders
evm_env.cfg_env.disable_eip3607 = true;
// Disable additional fee charges (e.g. L2 operator fees),
// consistent with prepare_call_env and estimate_gas_with.
evm_env.cfg_env.disable_fee_charge = true;
// Disable EIP-7825 transaction gas limit cap so that the gas limit
// fallback (block gas limit) is not rejected when it exceeds the
// per-tx cap (2^24 ≈ 16.7M post-Osaka).

View File

@@ -104,16 +104,11 @@ where
fork_timestamps.sort_unstable();
fork_timestamps.dedup();
let current_fork_idx = match fork_timestamps.iter().position(|ts| &latest.timestamp() < ts)
{
// All forks are in the past, use the last one.
None => fork_timestamps.len().checked_sub(1),
// First fork hasn't activated yet — no active timestamp fork.
Some(0) => None,
// Found a future fork; current is the one right before it.
Some(idx) => Some(idx - 1),
};
let (current_fork_idx, current_fork_timestamp) = current_fork_idx
let (current_fork_idx, current_fork_timestamp) = fork_timestamps
.iter()
.position(|ts| &latest.timestamp() < ts)
.and_then(|idx| idx.checked_sub(1))
.or_else(|| fork_timestamps.len().checked_sub(1))
.and_then(|idx| fork_timestamps.get(idx).map(|ts| (idx, *ts)))
.ok_or_else(|| RethError::msg("no active timestamp fork found"))?;

View File

@@ -42,7 +42,6 @@ pub trait EstimateCall: Call {
///
/// - `disable_eip3607` is set to `true`
/// - `disable_base_fee` is set to `true`
/// - `disable_fee_charge` is set to `true`
/// - `nonce` is set to `None`
fn estimate_gas_with<S>(
&self,
@@ -63,10 +62,6 @@ pub trait EstimateCall: Call {
// <https://github.com/ethereum/go-ethereum/blob/ee8e83fa5f6cb261dad2ed0a7bbcde4930c41e6c/internal/ethapi/api.go#L985>
evm_env.cfg_env.disable_base_fee = true;
// Disable additional fee charges (e.g. L2 operator fees) for gas estimation,
// consistent with `prepare_call_env` for `eth_call`.
evm_env.cfg_env.disable_fee_charge = true;
// set nonce to None so that the correct nonce is chosen by the EVM
request.as_mut().take_nonce();

View File

@@ -295,7 +295,6 @@ where
fn extract_reward_traces<H: BlockHeader>(
&self,
header: &H,
block_hash: BlockHash,
ommers: Option<&[H]>,
base_block_reward: u128,
) -> Vec<LocalizedTransactionTrace> {
@@ -304,7 +303,6 @@ where
let block_reward = block_reward(base_block_reward, ommers_cnt);
traces.push(reward_trace(
block_hash,
header,
RewardAction {
author: header.beneficiary(),
@@ -318,7 +316,6 @@ where
for uncle in ommers {
let uncle_reward = ommer_reward(base_block_reward, header.number(), uncle.number());
traces.push(reward_trace(
block_hash,
header,
RewardAction {
author: uncle.beneficiary(),
@@ -431,7 +428,6 @@ where
all_traces.extend(
self.extract_reward_traces(
block.header(),
block.hash(),
block.body().ommers(),
base_block_reward,
)
@@ -506,7 +502,6 @@ where
{
traces.extend(self.extract_reward_traces(
block.header(),
block.hash(),
block.body().ommers(),
base_block_reward,
));
@@ -800,13 +795,9 @@ pub struct BlockStorageAccess {
/// Helper to construct a [`LocalizedTransactionTrace`] that describes a reward to the block
/// beneficiary.
fn reward_trace<H: BlockHeader>(
block_hash: BlockHash,
header: &H,
reward: RewardAction,
) -> LocalizedTransactionTrace {
fn reward_trace<H: BlockHeader>(header: &H, reward: RewardAction) -> LocalizedTransactionTrace {
LocalizedTransactionTrace {
block_hash: Some(block_hash),
block_hash: Some(header.hash_slow()),
block_number: Some(header.number()),
transaction_hash: None,
transaction_position: None,

View File

@@ -378,7 +378,7 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
});
}
// update finalized and safe block if needed
// update finalized block if needed
let last_saved_finalized_block_number =
provider_rw.last_finalized_block_number()?;
@@ -392,16 +392,6 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
))?;
}
let last_saved_safe_block_number = provider_rw.last_safe_block_number()?;
if last_saved_safe_block_number.is_none() ||
Some(checkpoint.block_number) < last_saved_safe_block_number
{
provider_rw.save_safe_block_number(BlockNumber::from(
checkpoint.block_number,
))?;
}
provider_rw.commit()?;
stage.post_unwind_commit()?;

View File

@@ -121,3 +121,5 @@ test-utils = [
"reth-evm-ethereum/test-utils",
"reth-tasks/test-utils",
]
rocksdb = ["reth-provider/rocksdb", "reth-db-common/rocksdb"]
edge = ["rocksdb"]

View File

@@ -1,7 +1,9 @@
use super::collect_account_history_indices;
use crate::stages::utils::{collect_history_indices, load_account_history};
use reth_config::config::{EtlConfig, IndexHistoryConfig};
use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut, Tables};
#[cfg(all(unix, feature = "rocksdb"))]
use reth_db_api::Tables;
use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut};
use reth_provider::{
DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
RocksDBProviderFactory, StorageSettingsCache,
@@ -142,6 +144,7 @@ where
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
#[cfg(all(unix, feature = "rocksdb"))]
if use_rocksdb {
provider.commit_pending_rocksdb_batches()?;
provider.rocksdb_provider().flush(&[Tables::AccountsHistory.name()])?;
@@ -660,6 +663,7 @@ mod tests {
}
}
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb_tests {
use super::*;
use reth_provider::{

View File

@@ -1,11 +1,12 @@
use super::{collect_history_indices, collect_storage_history_indices};
use crate::{stages::utils::load_storage_history, StageCheckpoint, StageId};
use reth_config::config::{EtlConfig, IndexHistoryConfig};
#[cfg(all(unix, feature = "rocksdb"))]
use reth_db_api::Tables;
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, AddressStorageKey, BlockNumberAddress},
tables,
transaction::DbTxMut,
Tables,
};
use reth_provider::{
DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
@@ -147,6 +148,7 @@ where
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
#[cfg(all(unix, feature = "rocksdb"))]
if use_rocksdb {
provider.commit_pending_rocksdb_batches()?;
provider.rocksdb_provider().flush(&[Tables::StoragesHistory.name()])?;
@@ -689,6 +691,7 @@ mod tests {
}
}
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb_tests {
use super::*;
use reth_db_api::models::StorageBeforeTx;

View File

@@ -2,11 +2,12 @@ use alloy_eips::eip2718::Encodable2718;
use alloy_primitives::{TxHash, TxNumber};
use num_traits::Zero;
use reth_config::config::{EtlConfig, TransactionLookupConfig};
#[cfg(all(unix, feature = "rocksdb"))]
use reth_db_api::Tables;
use reth_db_api::{
table::{Decode, Decompress, Value},
tables,
transaction::DbTxMut,
Tables,
};
use reth_etl::Collector;
use reth_primitives_traits::{NodePrimitives, SignedTransaction};
@@ -198,6 +199,7 @@ where
}
}
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storage_v2 {
provider.commit_pending_rocksdb_batches()?;
provider.rocksdb_provider().flush(&[Tables::TransactionHashNumbers.name()])?;
@@ -599,6 +601,7 @@ mod tests {
}
}
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb_tests {
use super::*;
use reth_provider::RocksDBProviderFactory;

View File

@@ -36,3 +36,6 @@ reth-testing-utils.workspace = true
assert_matches.workspace = true
tempfile.workspace = true
[features]
edge = ["reth-stages/edge"]

View File

@@ -93,3 +93,5 @@ op = [
"reth-codecs/op",
"reth-primitives-traits/op",
]
rocksdb = []
edge = ["rocksdb"]

View File

@@ -28,8 +28,20 @@ pub struct StorageSettings {
impl StorageSettings {
/// Returns the default base `StorageSettings`.
///
/// When the `edge` feature is enabled, returns [`Self::v2()`] so that CI and
/// edge builds automatically use v2 storage defaults. Otherwise returns
/// [`Self::v1()`]. The `--storage.v2` CLI flag can also opt into v2 at runtime
/// regardless of feature flags.
pub const fn base() -> Self {
Self::v2()
#[cfg(feature = "edge")]
{
Self::v2()
}
#[cfg(not(feature = "edge"))]
{
Self::v1()
}
}
/// Creates `StorageSettings` for v2 nodes with all storage features enabled:

View File

@@ -46,5 +46,9 @@ reth-db = { workspace = true, features = ["mdbx"] }
reth-provider = { workspace = true, features = ["test-utils"] }
reth-tasks.workspace = true
[features]
rocksdb = ["reth-db-api/rocksdb", "reth-provider/rocksdb"]
edge = ["rocksdb"]
[lints]
workspace = true

View File

@@ -633,7 +633,7 @@ where
.ok_or_else(|| eyre::eyre!("Block hash not found for block {}", block))?;
let header = provider_rw
.header_by_number(block)?
.map(|h| SealedHeader::new(h, hash))
.map(SealedHeader::seal_slow)
.ok_or_else(|| ProviderError::HeaderNotFound(block.into()))?;
let expected_state_root = header.state_root();
@@ -1051,6 +1051,7 @@ mod tests {
)
};
#[cfg(feature = "rocksdb")]
{
let settings = factory.cached_storage_settings();
let rocksdb = factory.rocksdb_provider();
@@ -1078,6 +1079,13 @@ mod tests {
assert_eq!(accounts, expected_accounts);
assert_eq!(storages, expected_storages);
}
#[cfg(not(feature = "rocksdb"))]
{
let (accounts, storages) = collect_from_mdbx(&factory);
assert_eq!(accounts, expected_accounts);
assert_eq!(storages, expected_storages);
}
}
#[test]

View File

@@ -63,7 +63,9 @@ tokio = { workspace = true, features = ["sync"], optional = true }
# parallel utils
rayon.workspace = true
rocksdb = { workspace = true, features = ["jemalloc"] }
[target.'cfg(unix)'.dependencies]
# rocksdb: jemalloc is recommended production workload
rocksdb = { workspace = true, features = ["jemalloc"], optional = true }
[dev-dependencies]
reth-db = { workspace = true, features = ["test-utils"] }
@@ -85,6 +87,8 @@ rand.workspace = true
tokio = { workspace = true, features = ["sync", "macros", "rt-multi-thread"] }
[features]
rocksdb = ["reth-storage-api/rocksdb", "dep:rocksdb"]
edge = ["rocksdb"]
test-utils = [
"reth-db/test-utils",
"reth-nippy-jar/test-utils",

View File

@@ -7,11 +7,10 @@ use std::{
ops::{Range, RangeInclusive},
};
#[cfg(all(unix, feature = "rocksdb"))]
use crate::providers::rocksdb::RocksDBBatch;
use crate::{
providers::{
history_info, rocksdb::RocksDBBatch, HistoryInfo, StaticFileProvider,
StaticFileProviderRWRefMut,
},
providers::{history_info, HistoryInfo, StaticFileProvider, StaticFileProviderRWRefMut},
StaticFileProviderFactory,
};
use alloy_primitives::{map::HashMap, Address, BlockNumber, TxHash, TxNumber, B256};
@@ -63,16 +62,40 @@ type EitherWriterTy<'a, P, T> = EitherWriter<
>;
/// Helper type for `RocksDB` batch argument in writer constructors.
///
/// When `rocksdb` feature is enabled, this is a real `RocksDB` batch.
/// Otherwise, it's `()` (unit type) to allow the same API without feature gates.
#[cfg(all(unix, feature = "rocksdb"))]
pub type RocksBatchArg<'a> = crate::providers::rocksdb::RocksDBBatch<'a>;
/// Helper type for `RocksDB` batch argument in writer constructors.
///
/// When `rocksdb` feature is enabled, this is a real `RocksDB` batch.
/// Otherwise, it's `()` (unit type) to allow the same API without feature gates.
#[cfg(not(all(unix, feature = "rocksdb")))]
pub type RocksBatchArg<'a> = ();
/// The raw `RocksDB` batch type returned by [`EitherWriter::into_raw_rocksdb_batch`].
#[cfg(all(unix, feature = "rocksdb"))]
pub type RawRocksDBBatch = rocksdb::WriteBatchWithTransaction<true>;
/// The raw `RocksDB` batch type returned by [`EitherWriter::into_raw_rocksdb_batch`].
#[cfg(not(all(unix, feature = "rocksdb")))]
pub type RawRocksDBBatch = ();
/// Helper type for `RocksDB` transaction reference argument in reader constructors.
///
/// When `rocksdb` feature is enabled, this is an optional reference to a `RocksDB` transaction.
/// The `Option` allows callers to skip transaction creation when `RocksDB` isn't needed
/// (e.g., on legacy MDBX-only nodes).
/// When `rocksdb` feature is disabled, it's `()` (unit type) to allow the same API without
/// feature gates.
#[cfg(all(unix, feature = "rocksdb"))]
pub type RocksTxRefArg<'a> = Option<&'a crate::providers::rocksdb::RocksTx<'a>>;
/// Helper type for `RocksDB` transaction reference argument in reader constructors.
///
/// When `rocksdb` feature is disabled, it's `()` (unit type) to allow the same API without
/// feature gates.
#[cfg(not(all(unix, feature = "rocksdb")))]
pub type RocksTxRefArg<'a> = ();
/// Represents a destination for writing data, either to database, static files, or `RocksDB`.
#[derive(Debug, Display)]
@@ -82,6 +105,7 @@ pub enum EitherWriter<'a, CURSOR, N> {
/// Write to static file
StaticFile(StaticFileProviderRWRefMut<'a, N>),
/// Write to `RocksDB` using a write-only batch (historical tables).
#[cfg(all(unix, feature = "rocksdb"))]
RocksDB(RocksDBBatch<'a>),
}
@@ -230,6 +254,7 @@ impl<'a> EitherWriter<'a, (), ()> {
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
P::Tx: DbTxMut,
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storage_v2 {
return Ok(EitherWriter::RocksDB(_rocksdb_batch));
}
@@ -246,6 +271,7 @@ impl<'a> EitherWriter<'a, (), ()> {
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
P::Tx: DbTxMut,
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storage_v2 {
return Ok(EitherWriter::RocksDB(_rocksdb_batch));
}
@@ -264,6 +290,7 @@ impl<'a> EitherWriter<'a, (), ()> {
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
P::Tx: DbTxMut,
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storage_v2 {
return Ok(EitherWriter::RocksDB(_rocksdb_batch));
}
@@ -280,6 +307,7 @@ impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
///
/// This is used to defer `RocksDB` commits to the provider level, ensuring all
/// storage commits (MDBX, static files, `RocksDB`) happen atomically in a single place.
#[cfg(all(unix, feature = "rocksdb"))]
pub fn into_raw_rocksdb_batch(self) -> Option<rocksdb::WriteBatchWithTransaction<true>> {
match self {
Self::Database(_) | Self::StaticFile(_) => None,
@@ -287,6 +315,16 @@ impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
}
}
/// Extracts the raw `RocksDB` write batch from this writer, if it contains one.
///
/// Without the `rocksdb` feature, this always returns `None`.
#[cfg(not(all(unix, feature = "rocksdb")))]
pub fn into_raw_rocksdb_batch(self) -> Option<RawRocksDBBatch> {
match self {
Self::Database(_) | Self::StaticFile(_) => None,
}
}
/// Increment the block number.
///
/// Relevant only for [`Self::StaticFile`]. It is a no-op for [`Self::Database`].
@@ -294,6 +332,7 @@ impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
match self {
Self::Database(_) => Ok(()),
Self::StaticFile(writer) => writer.increment_block(expected_block_number),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}
@@ -308,6 +347,7 @@ impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
match self {
Self::Database(_) => Ok(()),
Self::StaticFile(writer) => writer.ensure_at_block(block_number),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}
@@ -323,6 +363,7 @@ where
match self {
Self::Database(cursor) => Ok(cursor.append(tx_num, receipt)?),
Self::StaticFile(writer) => writer.append_receipt(tx_num, receipt),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}
@@ -337,6 +378,7 @@ where
match self {
Self::Database(cursor) => Ok(cursor.append(tx_num, sender)?),
Self::StaticFile(writer) => writer.append_transaction_sender(tx_num, sender),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}
@@ -354,6 +396,7 @@ where
Ok(())
}
Self::StaticFile(writer) => writer.append_transaction_senders(senders),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}
@@ -386,6 +429,7 @@ where
writer.prune_transaction_senders(to_delete, block)?;
}
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
}
@@ -417,6 +461,7 @@ where
}
}
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.put::<tables::TransactionHashNumbers>(hash, &tx_num),
}
}
@@ -446,6 +491,7 @@ where
Ok(())
}
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => {
for (hash, tx_num) in entries {
batch.put::<tables::TransactionHashNumbers>(hash, &tx_num)?;
@@ -465,6 +511,7 @@ where
Ok(())
}
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.delete::<tables::TransactionHashNumbers>(hash),
}
}
@@ -483,6 +530,7 @@ where
match self {
Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
}
}
@@ -497,6 +545,7 @@ where
Ok(())
}
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.delete::<tables::StoragesHistory>(key),
}
}
@@ -510,6 +559,7 @@ where
match self {
Self::Database(cursor) => Ok(cursor.append(key, value)?),
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
}
}
@@ -523,6 +573,7 @@ where
match self {
Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
}
}
@@ -537,6 +588,7 @@ where
match self {
Self::Database(cursor) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.get::<tables::StoragesHistory>(key),
}
}
@@ -555,6 +607,7 @@ where
match self {
Self::Database(cursor) => Ok(cursor.append(key, value)?),
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.put::<tables::AccountsHistory>(key, value),
}
}
@@ -568,6 +621,7 @@ where
match self {
Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.put::<tables::AccountsHistory>(key, value),
}
}
@@ -582,6 +636,7 @@ where
Ok(cursor.seek_exact(ShardedKey::last(address))?.map(|(_, v)| v))
}
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.get::<tables::AccountsHistory>(ShardedKey::last(address)),
}
}
@@ -596,6 +651,7 @@ where
Ok(())
}
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.delete::<tables::AccountsHistory>(key),
}
}
@@ -624,6 +680,7 @@ where
Self::StaticFile(writer) => {
writer.append_account_changeset(changeset, block_number)?;
}
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
}
@@ -658,6 +715,7 @@ where
Self::StaticFile(writer) => {
writer.append_storage_changeset(changeset, block_number)?;
}
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
}
@@ -673,6 +731,7 @@ pub enum EitherReader<'a, CURSOR, N> {
/// Read from static file
StaticFile(StaticFileProvider<N>, PhantomData<&'a ()>),
/// Read from `RocksDB` transaction
#[cfg(all(unix, feature = "rocksdb"))]
RocksDB(&'a crate::providers::rocksdb::RocksTx<'a>),
}
@@ -704,6 +763,7 @@ impl<'a> EitherReader<'a, (), ()> {
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
P::Tx: DbTx,
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storage_v2 {
return Ok(EitherReader::RocksDB(
_rocksdb_tx.expect("storages_history_in_rocksdb requires rocksdb tx"),
@@ -725,6 +785,7 @@ impl<'a> EitherReader<'a, (), ()> {
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
P::Tx: DbTx,
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storage_v2 {
return Ok(EitherReader::RocksDB(
_rocksdb_tx.expect("transaction_hash_numbers_in_rocksdb requires rocksdb tx"),
@@ -746,6 +807,7 @@ impl<'a> EitherReader<'a, (), ()> {
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
P::Tx: DbTx,
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storage_v2 {
return Ok(EitherReader::RocksDB(
_rocksdb_tx.expect("account_history_in_rocksdb requires rocksdb tx"),
@@ -803,6 +865,7 @@ where
Some(result.map(|sender| (tx_num, sender)))
})
.collect::<ProviderResult<HashMap<_, _>>>(),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}
@@ -820,6 +883,7 @@ where
match self {
Self::Database(cursor, _) => Ok(cursor.seek_exact(hash)?.map(|(_, v)| v)),
Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(tx) => tx.get::<tables::TransactionHashNumbers>(hash),
}
}
@@ -837,6 +901,7 @@ where
match self {
Self::Database(cursor, _) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(tx) => tx.get::<tables::StoragesHistory>(key),
}
}
@@ -861,6 +926,7 @@ where
)
}
Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(tx) => tx.storage_history_info(
address,
storage_key,
@@ -883,6 +949,7 @@ where
match self {
Self::Database(cursor, _) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(tx) => tx.get::<tables::AccountsHistory>(key),
}
}
@@ -906,6 +973,7 @@ where
)
}
Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(tx) => {
tx.account_history_info(address, block_number, lowest_available_block_number)
}
@@ -954,6 +1022,7 @@ where
entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
})
.collect(),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}
@@ -1126,7 +1195,7 @@ mod tests {
}
}
#[cfg(test)]
#[cfg(all(test, unix, feature = "rocksdb"))]
mod rocksdb_tests {
use super::*;
use crate::{

View File

@@ -182,10 +182,12 @@ impl<N: ProviderNodeTypes> RocksDBProviderFactory for BlockchainProvider<N> {
self.database.rocksdb_provider()
}
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
unimplemented!("BlockchainProvider wraps ProviderFactory - use DatabaseProvider::set_pending_rocksdb_batch instead")
}
#[cfg(all(unix, feature = "rocksdb"))]
fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
unimplemented!("BlockchainProvider wraps ProviderFactory - use DatabaseProvider::commit_pending_rocksdb_batches instead")
}

View File

@@ -1287,9 +1287,7 @@ impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
Ok(match id {
BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
BlockId::Hash(hash) => self
.header(hash.block_hash)?
.map(|header| SealedHeader::new(header, hash.block_hash)),
BlockId::Hash(hash) => self.header(hash.block_hash)?.map(SealedHeader::seal_slow),
})
}

View File

@@ -28,8 +28,8 @@ use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment};
use reth_stages_types::{PipelineTarget, StageCheckpoint, StageId};
use reth_static_file_types::StaticFileSegment;
use reth_storage_api::{
BlockBodyIndicesProvider, ChainStateBlockReader, ChainStateBlockWriter, NodePrimitivesProvider,
StorageSettings, StorageSettingsCache, TryIntoHistoricalStateProvider,
BlockBodyIndicesProvider, NodePrimitivesProvider, StorageSettings, StorageSettingsCache,
TryIntoHistoricalStateProvider,
};
use reth_storage_errors::provider::ProviderResult;
use reth_trie::HashedPostState;
@@ -40,7 +40,7 @@ use std::{
path::Path,
sync::Arc,
};
use tracing::{info, instrument, trace};
use tracing::{instrument, trace};
mod provider;
pub use provider::{
@@ -195,10 +195,12 @@ impl<N: NodeTypesWithDB> RocksDBProviderFactory for ProviderFactory<N> {
self.rocksdb_provider.clone()
}
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::set_pending_rocksdb_batch instead")
}
#[cfg(all(unix, feature = "rocksdb"))]
fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::commit_pending_rocksdb_batches instead")
}
@@ -365,59 +367,8 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
},
);
// Step 4: Heal finalized/safe block numbers that may be ahead of the
// highest header on nodes coming from <=1.10.2.
//
// Unwinds already set it to the target block.
if rocksdb_unwind.is_none() && static_file_unwind.is_none() {
self.heal_chain_state_block_numbers(&provider_ro)?;
}
Ok((rocksdb_unwind, static_file_unwind))
}
/// If the stored finalized or safe block number is ahead of the highest
/// header, resets it to the highest header.
fn heal_chain_state_block_numbers(
&self,
provider_ro: &DatabaseProvider<<N::DB as Database>::TX, N>,
) -> ProviderResult<()> {
let highest_header = self.last_block_number()?;
let finalized = provider_ro.last_finalized_block_number()?;
let safe = provider_ro.last_safe_block_number()?;
if finalized.is_none_or(|f| f <= highest_header) && safe.is_none_or(|s| s <= highest_header)
{
return Ok(());
}
let provider_rw = self.provider_rw()?;
if let Some(finalized) = finalized.filter(|&f| f > highest_header) {
info!(
target: "providers::db",
finalized,
highest_header,
"Healing finalized block number",
);
provider_rw.save_finalized_block_number(highest_header)?;
}
if let Some(safe) = safe.filter(|&s| s > highest_header) {
info!(
target: "providers::db",
safe,
highest_header,
"Healing safe block number",
);
provider_rw.save_safe_block_number(highest_header)?;
}
provider_rw.commit()?;
Ok(())
}
}
impl<N: NodeTypesWithDB> NodePrimitivesProvider for ProviderFactory<N> {

View File

@@ -205,6 +205,7 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
/// Path to the database directory.
db_path: PathBuf,
/// Pending `RocksDB` batches to be committed at provider commit time.
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
pending_rocksdb_batches: PendingRocksDBBatches,
/// Commit order for database operations.
commit_order: CommitOrder,
@@ -322,10 +323,12 @@ impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
self.rocksdb_provider.clone()
}
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
self.pending_rocksdb_batches.lock().push(batch);
}
#[cfg(all(unix, feature = "rocksdb"))]
fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
@@ -452,11 +455,16 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
where
F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
{
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb = self.rocksdb_provider();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_batch = rocksdb.batch();
#[cfg(not(all(unix, feature = "rocksdb")))]
let rocksdb_batch = ();
let (result, raw_batch) = f(rocksdb_batch)?;
#[cfg(all(unix, feature = "rocksdb"))]
if let Some(batch) = raw_batch {
self.set_pending_rocksdb_batch(batch);
}
@@ -495,6 +503,7 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
}
/// Creates the context for `RocksDB` writes.
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
fn rocksdb_write_ctx(&self, first_block: BlockNumber) -> RocksDBWriteCtx {
RocksDBWriteCtx {
first_block_number: first_block,
@@ -553,11 +562,15 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
// avoid capturing &self.tx in scope below.
let sf_provider = &self.static_file_provider;
let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?;
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_provider = self.rocksdb_provider.clone();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_ctx = self.rocksdb_write_ctx(first_number);
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_enabled = rocksdb_ctx.storage_settings.storage_v2;
let mut sf_result = None;
#[cfg(all(unix, feature = "rocksdb"))]
let mut rocksdb_result = None;
// Write to all backends in parallel.
@@ -578,6 +591,7 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
});
// RocksDB writes
#[cfg(all(unix, feature = "rocksdb"))]
if rocksdb_enabled {
s.spawn(|_| {
let _guard = span.enter();
@@ -699,6 +713,7 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
// Collect results from spawned tasks
timings.sf = sf_result.ok_or(StaticFileWriterError::ThreadPanic("static file"))??;
#[cfg(all(unix, feature = "rocksdb"))]
if rocksdb_enabled {
timings.rocksdb = rocksdb_result.ok_or_else(|| {
ProviderError::Database(reth_db_api::DatabaseError::Other(
@@ -3256,8 +3271,11 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvi
last_indices.sort_unstable_by_key(|(a, _)| *a);
if self.cached_storage_settings().storage_v2 {
let batch = self.rocksdb_provider.unwind_account_history_indices(&last_indices)?;
self.pending_rocksdb_batches.lock().push(batch);
#[cfg(all(unix, feature = "rocksdb"))]
{
let batch = self.rocksdb_provider.unwind_account_history_indices(&last_indices)?;
self.pending_rocksdb_batches.lock().push(batch);
}
} else {
// Unwind the account history index in MDBX.
let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
@@ -3313,9 +3331,12 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvi
storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
if self.cached_storage_settings().storage_v2 {
let batch =
self.rocksdb_provider.unwind_storage_history_indices(&storage_changesets)?;
self.pending_rocksdb_batches.lock().push(batch);
#[cfg(all(unix, feature = "rocksdb"))]
{
let batch =
self.rocksdb_provider.unwind_storage_history_indices(&storage_changesets)?;
self.pending_rocksdb_batches.lock().push(batch);
}
} else {
// Unwind the storage history index in MDBX.
let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
@@ -3672,6 +3693,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
// append_*_history_shard which handles read-merge-write internally.
let storage_settings = self.cached_storage_settings();
if storage_settings.storage_v2 {
#[cfg(all(unix, feature = "rocksdb"))]
self.with_rocksdb_batch(|mut batch| {
for (address, blocks) in account_transitions {
batch.append_account_history_shard(address, blocks)?;
@@ -3682,6 +3704,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
self.insert_account_history_index(account_transitions)?;
}
if storage_settings.storage_v2 {
#[cfg(all(unix, feature = "rocksdb"))]
self.with_rocksdb_batch(|mut batch| {
for ((address, key), blocks) in storage_transitions {
batch.append_storage_history_shard(address, key, blocks)?;
@@ -3818,9 +3841,12 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
if self.static_file_provider.has_unwind_queued() || self.commit_order.is_unwind() {
self.tx.commit()?;
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
#[cfg(all(unix, feature = "rocksdb"))]
{
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
}
}
self.static_file_provider.commit()?;
@@ -3832,12 +3858,15 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
self.static_file_provider.finalize()?;
timings.sf = start.elapsed();
let start = Instant::now();
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
#[cfg(all(unix, feature = "rocksdb"))]
{
let start = Instant::now();
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
}
timings.rocksdb = start.elapsed();
}
timings.rocksdb = start.elapsed();
let start = Instant::now();
self.tx.commit()?;
@@ -5106,24 +5135,28 @@ mod tests {
}
}
let rocksdb = factory.rocksdb_provider();
for block_num in 1..=num_blocks {
for acct_idx in 0..accounts_per_block {
let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
let shards = rocksdb.account_history_shards(address).unwrap();
assert!(
!shards.is_empty(),
"v2: RocksDB AccountsHistory missing for block {block_num} acct {acct_idx}"
);
for s in 1..=slots_per_account as u64 {
let slot = U256::from(s + acct_idx as u64 * 100);
let slot_key = B256::from(slot);
let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
#[cfg(all(unix, feature = "rocksdb"))]
{
let rocksdb = factory.rocksdb_provider();
for block_num in 1..=num_blocks {
for acct_idx in 0..accounts_per_block {
let address =
Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
let shards = rocksdb.account_history_shards(address).unwrap();
assert!(
!shards.is_empty(),
"v2: RocksDB StoragesHistory missing for block {block_num} acct {acct_idx} slot {s}"
"v2: RocksDB AccountsHistory missing for block {block_num} acct {acct_idx}"
);
for s in 1..=slots_per_account as u64 {
let slot = U256::from(s + acct_idx as u64 * 100);
let slot_key = B256::from(slot);
let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
assert!(
!shards.is_empty(),
"v2: RocksDB StoragesHistory missing for block {block_num} acct {acct_idx} slot {s}"
);
}
}
}
}
@@ -5334,6 +5367,7 @@ mod tests {
}
#[test]
#[cfg(all(unix, feature = "rocksdb"))]
fn test_unwind_storage_history_indices_v2() {
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(StorageSettings::v2());

View File

@@ -32,6 +32,10 @@ pub use blockchain_provider::BlockchainProvider;
mod consistent;
pub use consistent::ConsistentProvider;
// RocksDB currently only supported on Unix platforms
// Windows support is planned for future releases
#[cfg_attr(all(unix, feature = "rocksdb"), path = "rocksdb/mod.rs")]
#[cfg_attr(not(all(unix, feature = "rocksdb")), path = "rocksdb_stub.rs")]
pub(crate) mod rocksdb;
pub use rocksdb::{

View File

@@ -0,0 +1,239 @@
//! Stub implementation of `RocksDB` provider.
//!
//! This module provides placeholder types that allow the code to compile when `RocksDB` is not
//! available (either on non-Unix platforms or when the `rocksdb` feature is not enabled).
//! All method calls are cfg-guarded in the calling code, so only type definitions are needed here.
use alloy_primitives::BlockNumber;
use metrics::Label;
use parking_lot::Mutex;
use reth_db_api::{database_metrics::DatabaseMetrics, models::StorageSettings};
use reth_prune_types::PruneMode;
use reth_storage_errors::{db::LogLevel, provider::ProviderResult};
use std::{path::Path, sync::Arc};
/// Pending `RocksDB` batches type alias (stub - uses unit type).
pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<()>>>;
/// Statistics for a single `RocksDB` table (column family) - stub.
#[derive(Debug, Clone)]
pub struct RocksDBTableStats {
/// Size of SST files on disk in bytes.
pub sst_size_bytes: u64,
/// Size of memtables in memory in bytes.
pub memtable_size_bytes: u64,
/// Name of the table/column family.
pub name: String,
/// Estimated number of keys in the table.
pub estimated_num_keys: u64,
/// Estimated size of live data in bytes (SST files + memtables).
pub estimated_size_bytes: u64,
/// Estimated bytes pending compaction (reclaimable space).
pub pending_compaction_bytes: u64,
}
/// Database-level statistics for `RocksDB` - stub.
#[derive(Debug, Clone)]
pub struct RocksDBStats {
/// Statistics for each table (column family).
pub tables: Vec<RocksDBTableStats>,
/// Total size of WAL (Write-Ahead Log) files in bytes.
pub wal_size_bytes: u64,
}
/// Context for `RocksDB` block writes (stub).
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub(crate) struct RocksDBWriteCtx {
/// The first block number being written.
pub first_block_number: BlockNumber,
/// The prune mode for transaction lookup, if any.
pub prune_tx_lookup: Option<PruneMode>,
/// Storage settings determining what goes to `RocksDB`.
pub storage_settings: StorageSettings,
/// Pending batches (stub - unused).
pub pending_batches: PendingRocksDBBatches,
}
/// A stub `RocksDB` provider.
///
/// This type exists to allow code to compile when `RocksDB` is not available (either on non-Unix
/// platforms or when the `rocksdb` feature is not enabled). All method calls on `RocksDBProvider`
/// are cfg-guarded in the calling code, so this stub only provides type definitions.
#[derive(Debug, Clone)]
pub struct RocksDBProvider;
impl RocksDBProvider {
/// Creates a new stub `RocksDB` provider.
pub fn new(_path: impl AsRef<Path>) -> ProviderResult<Self> {
Ok(Self)
}
/// Creates a new stub `RocksDB` provider builder.
pub fn builder(path: impl AsRef<Path>) -> RocksDBBuilder {
RocksDBBuilder::new(path)
}
/// Returns `true` if a `RocksDB` database exists at the given path (stub implementation).
///
/// Always returns `false` since `RocksDB` is not available.
pub fn exists(_path: impl AsRef<Path>) -> bool {
false
}
/// Check consistency of `RocksDB` tables (stub implementation).
///
/// Returns `None` since there is no `RocksDB` data to check when the feature is disabled.
pub const fn check_consistency<Provider>(
&self,
_provider: &Provider,
) -> ProviderResult<Option<BlockNumber>> {
Ok(None)
}
/// Returns statistics for all column families in the database (stub implementation).
///
/// Returns an empty vector since there is no `RocksDB` when the feature is disabled.
pub const fn table_stats(&self) -> Vec<RocksDBTableStats> {
Vec::new()
}
/// Clears all entries from the specified table (stub implementation).
///
/// This is a no-op since there is no `RocksDB` when the feature is disabled.
pub const fn clear<T>(&self) -> ProviderResult<()> {
Ok(())
}
/// Returns the total size of WAL (Write-Ahead Log) files in bytes (stub implementation).
///
/// Returns 0 since there is no `RocksDB` when the feature is disabled.
pub const fn wal_size_bytes(&self) -> u64 {
0
}
/// Returns database-level statistics including per-table stats and WAL size (stub
/// implementation).
///
/// Returns empty stats since there is no `RocksDB` when the feature is disabled.
pub const fn db_stats(&self) -> RocksDBStats {
RocksDBStats { tables: Vec::new(), wal_size_bytes: 0 }
}
/// Flushes all pending writes to disk (stub implementation).
///
/// This is a no-op since there is no `RocksDB` when the feature is disabled.
pub const fn flush(&self, _tables: &[&'static str]) -> ProviderResult<()> {
Ok(())
}
/// Creates an iterator over all entries in the specified table (stub implementation).
///
/// Returns an empty iterator since there is no `RocksDB` when the feature is disabled.
pub const fn iter<T: reth_db_api::table::Table>(&self) -> ProviderResult<RocksDBIter<T>> {
Ok(RocksDBIter(std::marker::PhantomData))
}
}
impl DatabaseMetrics for RocksDBProvider {
fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
vec![]
}
}
/// A stub batch writer for `RocksDB`.
#[derive(Debug)]
pub struct RocksDBBatch;
/// A stub builder for `RocksDB`.
#[derive(Debug)]
pub struct RocksDBBuilder;
impl RocksDBBuilder {
/// Creates a new stub builder.
pub fn new<P: AsRef<Path>>(_path: P) -> Self {
Self
}
/// Adds a column family for a specific table type (stub implementation).
pub const fn with_table<T>(self) -> Self {
self
}
/// Registers the default tables used by reth for `RocksDB` storage (stub implementation).
pub const fn with_default_tables(self) -> Self {
self
}
/// Enables metrics (stub implementation).
pub const fn with_metrics(self) -> Self {
self
}
/// Enables `RocksDB` internal statistics collection (stub implementation).
pub const fn with_statistics(self) -> Self {
self
}
/// Sets the log level from `DatabaseArgs` configuration (stub implementation).
pub const fn with_database_log_level(self, _log_level: Option<LogLevel>) -> Self {
self
}
/// Sets a custom block cache size (stub implementation).
pub const fn with_block_cache_size(self, _capacity_bytes: usize) -> Self {
self
}
/// Sets read-only mode (stub implementation).
pub const fn with_read_only(self, _read_only: bool) -> Self {
self
}
/// Build the `RocksDB` provider (stub implementation).
pub const fn build(self) -> ProviderResult<RocksDBProvider> {
Ok(RocksDBProvider)
}
}
/// A stub transaction for `RocksDB`.
#[derive(Debug)]
pub struct RocksTx;
/// A stub raw iterator for `RocksDB`.
#[derive(Debug)]
pub struct RocksDBRawIter;
/// A stub typed iterator for `RocksDB`.
#[derive(Debug)]
pub struct RocksDBIter<T: reth_db_api::table::Table>(std::marker::PhantomData<T>);
impl<T: reth_db_api::table::Table> Iterator for RocksDBIter<T> {
type Item = ProviderResult<(T::Key, T::Value)>;
fn next(&mut self) -> Option<Self::Item> {
None
}
}
/// Outcome of pruning a history shard in `RocksDB` (stub).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PruneShardOutcome {
/// Shard was deleted entirely.
Deleted,
/// Shard was updated with filtered block numbers.
Updated,
/// Shard was unchanged (no blocks <= `to_block`).
Unchanged,
}
/// Tracks pruning outcomes for batch operations (stub).
#[derive(Debug, Default, Clone, Copy)]
pub struct PrunedIndices {
/// Number of shards completely deleted.
pub deleted: usize,
/// Number of shards that were updated (filtered but still have entries).
pub updated: usize,
/// Number of shards that were unchanged.
pub unchanged: usize,
}

View File

@@ -1170,6 +1170,7 @@ mod tests {
}
#[test]
#[cfg(all(unix, feature = "rocksdb"))]
fn history_provider_get_storage_hashed_state() {
use crate::BlockWriter;
use alloy_primitives::keccak256;

View File

@@ -30,8 +30,10 @@ impl<C: Send + Sync, N: NodePrimitives> RocksDBProviderFactory for NoopProvider<
RocksDBProvider::builder(PathBuf::default()).build().unwrap()
}
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {}
#[cfg(all(unix, feature = "rocksdb"))]
fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
Ok(())
}

View File

@@ -16,6 +16,7 @@ pub trait RocksDBProviderFactory {
///
/// This allows deferring `RocksDB` commits to happen at the same time as MDBX and static file
/// commits, ensuring atomicity across all storage backends.
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>);
/// Takes all pending `RocksDB` batches and commits them.
@@ -23,6 +24,7 @@ pub trait RocksDBProviderFactory {
/// This drains the pending batches from the lock and commits each one using the `RocksDB`
/// provider. Can be called before flush to persist `RocksDB` writes independently of the
/// full commit path.
#[cfg(all(unix, feature = "rocksdb"))]
fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()>;
/// Executes a closure with a `RocksDB` transaction for reading.
@@ -35,12 +37,17 @@ pub trait RocksDBProviderFactory {
Self: StorageSettingsCache,
F: FnOnce(RocksTxRefArg<'_>) -> ProviderResult<R>,
{
if self.cached_storage_settings().storage_v2 {
let rocksdb = self.rocksdb_provider();
let tx = rocksdb.tx();
return f(Some(&tx));
#[cfg(all(unix, feature = "rocksdb"))]
{
if self.cached_storage_settings().storage_v2 {
let rocksdb = self.rocksdb_provider();
let tx = rocksdb.tx();
return f(Some(&tx));
}
f(None)
}
f(None)
#[cfg(not(all(unix, feature = "rocksdb")))]
f(())
}
/// Executes a closure with a `RocksDB` batch, automatically registering it for commit.
@@ -50,13 +57,21 @@ pub trait RocksDBProviderFactory {
where
F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
{
let rocksdb = self.rocksdb_provider();
let batch = rocksdb.batch();
let (result, raw_batch) = f(batch)?;
if let Some(b) = raw_batch {
self.set_pending_rocksdb_batch(b);
#[cfg(all(unix, feature = "rocksdb"))]
{
let rocksdb = self.rocksdb_provider();
let batch = rocksdb.batch();
let (result, raw_batch) = f(batch)?;
if let Some(b) = raw_batch {
self.set_pending_rocksdb_batch(b);
}
Ok(result)
}
#[cfg(not(all(unix, feature = "rocksdb")))]
{
let (result, _) = f(())?;
Ok(result)
}
Ok(result)
}
/// Executes a closure with a `RocksDB` batch that auto-commits on threshold.
@@ -68,17 +83,25 @@ pub trait RocksDBProviderFactory {
where
F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
{
let rocksdb = self.rocksdb_provider();
let batch = rocksdb.batch_with_auto_commit();
let (result, raw_batch) = f(batch)?;
if let Some(b) = raw_batch {
self.set_pending_rocksdb_batch(b);
#[cfg(all(unix, feature = "rocksdb"))]
{
let rocksdb = self.rocksdb_provider();
let batch = rocksdb.batch_with_auto_commit();
let (result, raw_batch) = f(batch)?;
if let Some(b) = raw_batch {
self.set_pending_rocksdb_batch(b);
}
Ok(result)
}
#[cfg(not(all(unix, feature = "rocksdb")))]
{
let (result, _) = f(())?;
Ok(result)
}
Ok(result)
}
}
#[cfg(test)]
#[cfg(all(test, unix, feature = "rocksdb"))]
mod tests {
use super::*;
use reth_db_api::models::StorageSettings;

View File

@@ -36,6 +36,8 @@ serde_json = { workspace = true, optional = true }
[features]
default = ["std"]
rocksdb = ["reth-db-api/rocksdb"]
edge = ["rocksdb"]
std = [
"reth-chainspec/std",
"alloy-consensus/std",

View File

@@ -740,8 +740,7 @@ where
sender: Address,
nonce: u64,
) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
let sender_id = self.pool.sender_id(&sender)?;
let transaction_id = TransactionId::new(sender_id, nonce);
let transaction_id = TransactionId::new(self.pool.get_sender_id(sender), nonce);
self.inner().get_pool_data().all().get(&transaction_id).map(|tx| tx.transaction.clone())
}

View File

@@ -215,24 +215,11 @@ where
self.notify_on_transaction_updates(outcome.promoted, outcome.discarded);
}
/// Returns the internal [`SenderId`] for this address, allocating a new mapping when the
/// address is first observed.
///
/// This must only be used on paths that intentionally begin tracking a sender, such as
/// transaction insertion. Read-only lookups should prefer [`Self::sender_id`] to avoid
/// growing the sender-id map for unknown addresses.
/// Returns the internal [`SenderId`] for this address
pub fn get_sender_id(&self, addr: Address) -> SenderId {
self.identifiers.write().sender_id_or_create(addr)
}
/// Returns the internal [`SenderId`] for this address if it is already tracked.
///
/// Unlike [`Self::get_sender_id`], this never allocates a new sender mapping and is therefore
/// suitable for read-only queries or best-effort cleanup on unknown addresses.
pub fn sender_id(&self, addr: &Address) -> Option<SenderId> {
self.identifiers.read().sender_id(addr)
}
/// Returns the internal [`SenderId`]s for the given addresses.
pub fn get_sender_ids(&self, addrs: impl IntoIterator<Item = Address>) -> Vec<SenderId> {
self.identifiers.write().sender_ids_or_create(addrs)
@@ -1091,7 +1078,7 @@ where
&self,
sender: Address,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
let sender_id = self.get_sender_id(sender);
let removed = self.pool.write().remove_transactions_by_sender(sender_id);
self.with_event_listener(|listener| listener.discarded_many(&removed));
@@ -1136,7 +1123,7 @@ where
&self,
sender: Address,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
let sender_id = self.get_sender_id(sender);
self.get_pool_data().get_transactions_by_sender(sender_id)
}
@@ -1146,7 +1133,7 @@ where
sender: Address,
nonce: u64,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
let sender_id = self.sender_id(&sender)?;
let sender_id = self.get_sender_id(sender);
self.get_pool_data().get_pending_transaction_by_sender_and_nonce(sender_id, nonce)
}
@@ -1155,7 +1142,7 @@ where
&self,
sender: Address,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
let sender_id = self.get_sender_id(sender);
self.get_pool_data().queued_txs_by_sender(sender_id)
}
@@ -1172,7 +1159,7 @@ where
&self,
sender: Address,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
let sender_id = self.get_sender_id(sender);
self.get_pool_data().pending_txs_by_sender(sender_id)
}
@@ -1181,7 +1168,7 @@ where
&self,
sender: Address,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
let sender_id = self.sender_id(&sender)?;
let sender_id = self.get_sender_id(sender);
self.get_pool_data().get_highest_transaction_by_sender(sender_id)
}
@@ -1191,7 +1178,7 @@ where
sender: Address,
on_chain_nonce: u64,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
let sender_id = self.sender_id(&sender)?;
let sender_id = self.get_sender_id(sender);
self.get_pool_data().get_highest_consecutive_transaction_by_sender(
sender_id.into_transaction_id(on_chain_nonce),
)
@@ -1760,20 +1747,4 @@ mod tests {
let identifiers = test_pool.identifiers.read();
assert_eq!(identifiers.sender_id(&auth), Some(SenderId::from(1)));
}
#[test]
fn sender_queries_do_not_allocate_ids_for_unknown_addresses() {
let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
let sender = Address::new([9; 20]);
assert_eq!(test_pool.sender_id(&sender), None);
assert!(test_pool.get_transactions_by_sender(sender).is_empty());
assert!(test_pool.get_pending_transaction_by_sender_and_nonce(sender, 0).is_none());
assert!(test_pool.get_queued_transactions_by_sender(sender).is_empty());
assert!(test_pool.get_pending_transactions_by_sender(sender).is_empty());
assert!(test_pool.get_highest_transaction_by_sender(sender).is_none());
assert!(test_pool.get_highest_consecutive_transaction_by_sender(sender, 0).is_none());
assert!(test_pool.remove_transactions_by_sender(sender).is_empty());
assert_eq!(test_pool.sender_id(&sender), None);
}
}

View File

@@ -3104,35 +3104,6 @@ mod tests {
assert_eq!(pool.len(), 1);
}
#[test]
fn insert_replace_underpriced_rounds_up_minimum_bump() {
let on_chain_balance = U256::ZERO;
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = AllTransactions { minimal_protocol_basefee: 0, ..Default::default() };
let mut tx = MockTransaction::eip1559().inc_price().inc_limit();
tx.set_priority_fee(1);
tx.set_max_fee(1);
let first = f.validated(tx.clone());
let _ = pool.insert_tx(first.clone(), on_chain_balance, on_chain_nonce).unwrap();
let mut replacement = f.validated(tx.rng_hash().inc_price());
replacement.transaction.set_priority_fee(1);
replacement.transaction.set_max_fee(2);
let err =
pool.insert_tx(replacement.clone(), on_chain_balance, on_chain_nonce).unwrap_err();
assert!(matches!(err, InsertErr::Underpriced { .. }));
assert!(pool.contains(first.hash()));
assert_eq!(pool.len(), 1);
replacement.transaction.set_priority_fee(2);
replacement.transaction.set_max_fee(2);
let replaced = pool.insert_tx(replacement, on_chain_balance, on_chain_nonce).unwrap();
assert!(replaced.replaced_tx.is_some());
assert_eq!(pool.len(), 1);
}
#[test]
fn insert_conflicting_type_normal_to_blob() {
let on_chain_balance = U256::from(10_000);

View File

@@ -455,11 +455,9 @@ impl<T: PoolTransaction> ValidPoolTransaction<T> {
//
// The bump is different for EIP-4844 and other transactions. See `PriceBumpConfig`.
let price_bump = price_bumps.price_bump(self.tx_type());
let required_bumped_fee =
|existing_fee: u128| existing_fee.saturating_mul(100 + price_bump).div_ceil(100);
// Check if the max fee per gas is underpriced.
if maybe_replacement.max_fee_per_gas() < required_bumped_fee(self.max_fee_per_gas()) {
if maybe_replacement.max_fee_per_gas() < self.max_fee_per_gas() * (100 + price_bump) / 100 {
return true
}
@@ -472,7 +470,7 @@ impl<T: PoolTransaction> ValidPoolTransaction<T> {
if existing_max_priority_fee_per_gas != 0 &&
replacement_max_priority_fee_per_gas != 0 &&
replacement_max_priority_fee_per_gas <
required_bumped_fee(existing_max_priority_fee_per_gas)
existing_max_priority_fee_per_gas * (100 + price_bump) / 100
{
return true
}
@@ -482,7 +480,8 @@ impl<T: PoolTransaction> ValidPoolTransaction<T> {
// This enforces that blob txs can only be replaced by blob txs
let replacement_max_blob_fee_per_gas =
maybe_replacement.transaction.max_fee_per_blob_gas().unwrap_or_default();
if replacement_max_blob_fee_per_gas < required_bumped_fee(existing_max_blob_fee_per_gas)
if replacement_max_blob_fee_per_gas <
existing_max_blob_fee_per_gas * (100 + price_bump) / 100
{
return true
}

View File

@@ -1,13 +1,19 @@
use super::{
branch_child_idx::{BranchChildIdx, BranchChildIter},
ArenaSparseNode, ArenaSparseNodeBranchChild, ArenaSparseNodeState, Index, NodeArena,
ArenaSparseNode, ArenaSparseNodeBranchChild, ArenaSparseNodeState,
};
use alloc::vec::Vec;
use reth_trie_common::Nibbles;
use slotmap::{DefaultKey, SlotMap};
use tracing::{instrument, trace};
const TRACE_TARGET: &str = "trie::arena::cursor";
/// Alias for the slotmap key type used as node references throughout the arena trie.
type Index = DefaultKey;
/// Alias for the slotmap used as the node arena throughout the arena trie.
type NodeArena = SlotMap<Index, ArenaSparseNode>;
/// An entry on the cursor's traversal stack, tracking an ancestor node during trie walks.
#[derive(Debug, Clone)]
pub(super) struct ArenaCursorStackEntry {
@@ -87,14 +93,17 @@ impl ArenaCursor {
self.stack.len() - 1
}
/// Replaces the root entry on the stack with a new one.
///
/// The stack must contain exactly the root (depth 0) or be empty (freshly constructed).
/// Returns `true` if the stack is empty.
pub(super) const fn is_empty(&self) -> bool {
self.stack.is_empty()
}
/// Clears the traversal stack and pushes the given root entry.
#[instrument(level = "trace", target = TRACE_TARGET, skip(self, arena))]
pub(super) fn reset(&mut self, arena: &NodeArena, idx: Index, path: Nibbles) {
debug_assert!(
self.stack.len() <= 1 && !self.needs_pop,
"cursor must be drained before reset; stack has {} entries, needs_pop={}",
self.stack.is_empty() && !self.needs_pop,
"cursor must be fully drained before reset; stack has {} entries, needs_pop={}",
self.stack.len(),
self.needs_pop,
);
@@ -141,15 +150,18 @@ impl ArenaCursor {
_ => false,
});
if child_is_dirty {
*arena[parent.index].state_mut() = ArenaSparseNodeState::Dirty;
let parent_state = arena[parent.index].state_mut();
if !matches!(parent_state, ArenaSparseNodeState::Dirty) {
*parent_state = ArenaSparseNodeState::Dirty;
}
}
}
entry
}
/// Drains the stack down to the root, propagating dirty state from each popped entry
/// to its parent. The root entry remains on the stack (there is no parent to propagate to).
/// Drains the stack, propagating dirty state from each entry to its parent,
/// then removes the final (root) entry.
#[instrument(level = "trace", target = TRACE_TARGET, skip_all)]
pub(super) fn drain(&mut self, arena: &mut NodeArena) {
trace!(target: TRACE_TARGET, "Draining stack");
@@ -157,6 +169,7 @@ impl ArenaCursor {
while self.stack.len() > 1 {
self.pop(arena);
}
self.stack.clear();
}
/// Returns the logical path of the branch at the top of the stack.

View File

@@ -1623,25 +1623,29 @@ impl ArenaParallelSparseTrie {
let child_nibble = head_path.last().expect("non-root leaf");
let parent_branch = arena[parent_idx].branch_ref();
if parent_branch.state_mask.count_bits() == 2 &&
parent_branch.sibling_child(child_nibble).is_blinded()
{
let sibling_nibble = parent_branch
.state_mask
.iter()
.find(|&n| n != child_nibble)
.expect("branch has two children");
let mut sibling_path = cursor.parent_logical_branch_path(arena);
sibling_path.push_unchecked(sibling_nibble);
trace!(target: TRACE_TARGET, ?full_path, ?sibling_path, "Removal would collapse branch onto blinded sibling, requesting proof");
return (
RemoveLeafResult::NeedsProof {
key,
proof_key: Self::nibbles_to_padded_b256(&sibling_path),
min_len: (sibling_path.len() as u8).min(64),
},
SubtrieCounterDeltas::default(),
);
if parent_branch.state_mask.count_bits() == 2 {
let child_idx = BranchChildIdx::new(parent_branch.state_mask, child_nibble)
.expect("leaf nibble not found in parent state_mask");
// With exactly 2 bits set the dense array has indices 0 and 1.
let sibling_dense_idx = 1 - child_idx.get();
if parent_branch.children[sibling_dense_idx].is_blinded() {
let sibling_nibble = parent_branch
.state_mask
.iter()
.find(|&n| n != child_nibble)
.expect("branch has two children");
let mut sibling_path = cursor.parent_logical_branch_path(arena);
sibling_path.push_unchecked(sibling_nibble);
trace!(target: TRACE_TARGET, ?full_path, ?sibling_path, "Removal would collapse branch onto blinded sibling, requesting proof");
return (
RemoveLeafResult::NeedsProof {
key,
proof_key: Self::nibbles_to_padded_b256(&sibling_path),
min_len: (sibling_path.len() as u8).min(64),
},
SubtrieCounterDeltas::default(),
);
}
}
}
@@ -1649,7 +1653,10 @@ impl ArenaParallelSparseTrie {
let removed_was_dirty =
matches!(arena[head_idx].state_ref(), Some(ArenaSparseNodeState::Dirty));
if cursor.depth() == 0 {
// Pop the leaf entry, propagating dirty state to the parent.
cursor.pop(arena);
if cursor.is_empty() {
// The leaf is the root — replace with EmptyRoot and reset the cursor
// so subsequent iterations can call seek normally.
arena.remove(head_idx);
@@ -1664,9 +1671,6 @@ impl ArenaParallelSparseTrie {
);
}
// Pop the leaf entry, propagating dirty state to the parent.
cursor.pop(arena);
// The parent must be a branch. Remove the leaf from it.
let parent_entry = cursor.head().expect("cursor is non-empty");
let parent_idx = parent_entry.index;
@@ -1734,7 +1738,10 @@ impl ArenaParallelSparseTrie {
return None;
}
if !parent_branch.sibling_child(child_nibble).is_blinded() {
let child_idx = BranchChildIdx::new(parent_branch.state_mask, child_nibble)
.expect("child nibble not in parent state_mask");
let sibling_dense_idx = 1 - child_idx.get();
if !parent_branch.children[sibling_dense_idx].is_blinded() {
return None;
}
@@ -2127,6 +2134,21 @@ impl ArenaParallelSparseTrie {
Self::merge_subtrie_updates(&mut self.buffers.updates, &mut subtrie.buffers.updates);
}
/// Returns true if the current `update_leaves` batch should defer revealed subtries for
/// parallel processing. Will always return false in nostd builds.
const fn is_update_leaves_parallelism_enabled(&self, num_updates: usize) -> bool {
#[cfg(not(feature = "std"))]
{
let _ = num_updates;
false
}
#[cfg(feature = "std")]
{
num_updates >= self.parallelism_thresholds.min_updates
}
}
}
impl SparseTrie for ArenaParallelSparseTrie {
@@ -2756,7 +2778,7 @@ impl SparseTrie for ArenaParallelSparseTrie {
updates.drain().map(|(key, update)| (key, Nibbles::unpack(key), update)).collect();
sorted.sort_unstable_by_key(|entry| entry.1);
let threshold = self.parallelism_thresholds.min_updates;
let parallelize_subtries = self.is_update_leaves_parallelism_enabled(sorted.len());
let mut cursor = mem::take(&mut self.buffers.cursor);
cursor.reset(&self.upper_arena, self.root, Nibbles::default());
@@ -2817,8 +2839,10 @@ impl SparseTrie for ArenaParallelSparseTrie {
let num_subtrie_updates = update_idx - subtrie_start;
if num_subtrie_updates >= threshold {
// Take subtrie for parallel update.
if parallelize_subtries {
// Take subtrie for deferred update. Once the batch-level threshold is
// met, even small subtries are processed together so distributed work
// does not get stranded on the serial path.
trace!(target: TRACE_TARGET, ?subtrie_root_path, num_subtrie_updates, "Taking subtrie for parallel update");
let ArenaSparseNode::Subtrie(subtrie) = mem::replace(
&mut self.upper_arena[child_idx],
@@ -3184,6 +3208,22 @@ mod tests {
changeset
}
#[test]
fn update_leaves_parallelism_threshold_is_batch_scoped() {
let trie = ArenaParallelSparseTrie::default().with_parallelism_thresholds(
ArenaParallelismThresholds {
min_dirty_leaves: 64,
min_revealed_nodes: 16,
min_updates: 8,
min_leaves_for_prune: 128,
},
);
assert!(!trie.is_update_leaves_parallelism_enabled(7));
assert!(trie.is_update_leaves_parallelism_enabled(8));
assert!(trie.is_update_leaves_parallelism_enabled(32));
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(1000))]
#[test]

View File

@@ -1,13 +1,19 @@
use super::{
branch_child_idx::{BranchChildIdx, BranchChildIter},
ArenaSparseSubtrie, Index, NodeArena,
ArenaSparseSubtrie,
};
use alloc::{boxed::Box, vec::Vec};
use alloy_primitives::{keccak256, B256};
use alloy_trie::{BranchNodeCompact, TrieMask};
use reth_trie_common::{BranchNodeMasks, Nibbles, ProofTrieNodeV2, RlpNode, TrieNodeV2};
use slotmap::{DefaultKey, SlotMap};
use smallvec::SmallVec;
/// Alias for the slotmap key type used as node references throughout the arena trie.
type Index = DefaultKey;
/// Alias for the slotmap used as the node arena throughout the arena trie.
type NodeArena = SlotMap<Index, ArenaSparseNode>;
/// Tracks whether a node's RLP encoding is cached or needs recomputation.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(super) enum ArenaSparseNodeState {
@@ -100,23 +106,6 @@ impl ArenaSparseNodeBranch {
self.state = ArenaSparseNodeState::Dirty;
}
/// Returns a reference to the sibling child in a branch with exactly 2 children.
///
/// # Panics
///
/// Panics (debug) if the branch does not have exactly 2 children, or if `nibble` is not set.
pub(super) fn sibling_child(&self, nibble: u8) -> &ArenaSparseNodeBranchChild {
debug_assert_eq!(
self.state_mask.count_bits(),
2,
"sibling_child requires exactly 2 children"
);
let child_idx =
BranchChildIdx::new(self.state_mask, nibble).expect("nibble not found in state_mask");
// With exactly 2 children the dense array has indices 0 and 1.
&self.children[1 - child_idx.get()]
}
/// Iterates over `(nibble, &ArenaSparseNodeBranchChild)` pairs in nibble order.
pub(super) fn child_iter(
&self,

View File

@@ -50,7 +50,7 @@ group "default" {
}
group "nightly" {
targets = ["ethereum", "ethereum-profiling"]
targets = ["ethereum", "ethereum-profiling", "ethereum-edge-profiling"]
}
// Base target with shared configuration
@@ -100,6 +100,17 @@ target "ethereum-profiling" {
tags = ["${REGISTRY}/reth:nightly-profiling"]
}
target "ethereum-edge-profiling" {
inherits = ["_base_profiling"]
args = {
BINARY = "reth"
MANIFEST_PATH = "bin/reth"
BUILD_PROFILE = "profiling"
FEATURES = "jemalloc-prof edge"
}
tags = ["${REGISTRY}/reth:nightly-edge-profiling"]
}
// Hive test targets single-platform, hivetests profile, tar output
target "_base_hive" {
inherits = ["_base"]
@@ -113,7 +124,7 @@ variable "HIVE_OUTPUT_DIR" {
default = "./artifacts"
}
target "hive" {
target "hive-stable" {
inherits = ["_base_hive"]
args = {
BINARY = "reth"
@@ -123,6 +134,17 @@ target "hive" {
output = ["type=docker,dest=${HIVE_OUTPUT_DIR}/reth_image.tar"]
}
target "hive-edge" {
inherits = ["_base_hive"]
args = {
BINARY = "reth"
MANIFEST_PATH = "bin/reth"
FEATURES = "edge"
}
tags = ["reth:local"]
output = ["type=docker,dest=${HIVE_OUTPUT_DIR}/reth_image.tar"]
}
// Kurtosis test target
target "kurtosis" {
inherits = ["_base_hive"]

File diff suppressed because it is too large Load Diff

View File

@@ -3,7 +3,7 @@ name: reth
services:
lighthouse:
restart: unless-stopped
image: sigp/lighthouse:v8.1.2
image: sigp/lighthouse:v8.0.1
depends_on:
- reth
ports: