mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
1 Commits
yk/shared-
...
pep/batch-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c825c7f7a4 |
@@ -1,5 +0,0 @@
|
||||
---
|
||||
reth-trie-sparse: patch
|
||||
---
|
||||
|
||||
Refactored arena trie internals by adding a `BranchChildIdx::sibling()` helper, deduplicating `Index`/`NodeArena` type aliases, and replacing `is_empty()` with a `drop_root()` method. Fixed a bug where `cursor.pop()` was called before checking if the leaf was the root node, which could cause incorrect dirty-state propagation.
|
||||
1
.github/actionlint.yaml
vendored
1
.github/actionlint.yaml
vendored
@@ -5,4 +5,3 @@ self-hosted-runner:
|
||||
- depot-ubuntu-latest-4
|
||||
- depot-ubuntu-latest-8
|
||||
- depot-ubuntu-latest-16
|
||||
- available
|
||||
|
||||
27
.github/scripts/bench-reth-run.sh
vendored
27
.github/scripts/bench-reth-run.sh
vendored
@@ -11,8 +11,6 @@
|
||||
# BENCH_WAIT_TIME (duration like 500ms, default empty)
|
||||
# BENCH_BASELINE_ARGS (extra reth node args for baseline runs)
|
||||
# BENCH_FEATURE_ARGS (extra reth node args for feature runs)
|
||||
# BENCH_OTLP_TRACES_ENDPOINT (OTLP HTTP endpoint for traces, e.g. https://host/insert/opentelemetry/v1/traces)
|
||||
# BENCH_OTLP_LOGS_ENDPOINT (OTLP HTTP endpoint for logs, e.g. https://host/insert/opentelemetry/v1/logs)
|
||||
set -euo pipefail
|
||||
|
||||
LABEL="$1"
|
||||
@@ -141,14 +139,6 @@ if [ -n "${BENCH_METRICS_ADDR:-}" ]; then
|
||||
RETH_ARGS+=(--metrics "$BENCH_METRICS_ADDR")
|
||||
fi
|
||||
|
||||
# OTLP traces and logs export
|
||||
if [ -n "${BENCH_OTLP_TRACES_ENDPOINT:-}" ]; then
|
||||
RETH_ARGS+=(--tracing-otlp="${BENCH_OTLP_TRACES_ENDPOINT}" --tracing-otlp.service-name=reth-bench)
|
||||
fi
|
||||
if [ -n "${BENCH_OTLP_LOGS_ENDPOINT:-}" ]; then
|
||||
RETH_ARGS+=(--logs-otlp="${BENCH_OTLP_LOGS_ENDPOINT}" --logs-otlp.filter=debug)
|
||||
fi
|
||||
|
||||
# Tracy profiling: add --log.tracy flags and set environment
|
||||
if [ "${BENCH_TRACY:-off}" != "off" ]; then
|
||||
RETH_ARGS+=(--log.tracy --log.tracy.filter "${BENCH_TRACY_FILTER:-debug}")
|
||||
@@ -159,29 +149,16 @@ if [ "${BENCH_TRACY:-off}" != "off" ]; then
|
||||
fi
|
||||
fi
|
||||
|
||||
SUDO_ENV=()
|
||||
if [ -n "${OTEL_RESOURCE_ATTRIBUTES:-}" ]; then
|
||||
SUDO_ENV+=("OTEL_RESOURCE_ATTRIBUTES=${OTEL_RESOURCE_ATTRIBUTES}")
|
||||
SUDO_ENV+=("OTEL_BSP_MAX_QUEUE_SIZE=65536" "OTEL_BLRP_MAX_QUEUE_SIZE=65536")
|
||||
fi
|
||||
|
||||
# Limit reth memory to 95% of available RAM to prevent OOM kills
|
||||
TOTAL_MEM_KB=$(awk '/^MemTotal:/ {print $2}' /proc/meminfo)
|
||||
MEM_LIMIT=$(( TOTAL_MEM_KB * 95 / 100 * 1024 ))
|
||||
echo "Memory limit: $(( MEM_LIMIT / 1024 / 1024 ))MB (95% of $(( TOTAL_MEM_KB / 1024 ))MB)"
|
||||
|
||||
if [ "${BENCH_SAMPLY:-false}" = "true" ]; then
|
||||
RETH_ARGS+=(--log.samply)
|
||||
SAMPLY="$(which samply)"
|
||||
sudo systemd-run --scope -p MemoryMax="$MEM_LIMIT" -p AllowedCPUs="$RETH_CPUS" \
|
||||
env "${SUDO_ENV[@]}" nice -n -20 \
|
||||
sudo taskset -c "$RETH_CPUS" nice -n -20 \
|
||||
"$SAMPLY" record --save-only --presymbolicate --rate 10000 \
|
||||
--output "$OUTPUT_DIR/samply-profile.json.gz" \
|
||||
-- "$BINARY" "${RETH_ARGS[@]}" \
|
||||
> "$LOG" 2>&1 &
|
||||
else
|
||||
sudo systemd-run --scope -p MemoryMax="$MEM_LIMIT" -p AllowedCPUs="$RETH_CPUS" \
|
||||
env "${SUDO_ENV[@]}" nice -n -20 "$BINARY" "${RETH_ARGS[@]}" \
|
||||
sudo taskset -c "$RETH_CPUS" nice -n -20 "$BINARY" "${RETH_ARGS[@]}" \
|
||||
> "$LOG" 2>&1 &
|
||||
fi
|
||||
|
||||
|
||||
10
.github/scripts/bench-slack-notify.js
vendored
10
.github/scripts/bench-slack-notify.js
vendored
@@ -7,8 +7,6 @@
|
||||
// BENCH_PR – PR number (may be empty)
|
||||
// BENCH_ACTOR – GitHub user who triggered the bench
|
||||
// BENCH_JOB_URL – URL to the Actions job page
|
||||
// BENCH_BASELINE_ARGS – Extra CLI args for the baseline reth node
|
||||
// BENCH_FEATURE_ARGS – Extra CLI args for the feature reth node
|
||||
// BENCH_SAMPLY – 'true' if samply profiling was enabled
|
||||
//
|
||||
// Usage from actions/github-script:
|
||||
@@ -134,13 +132,7 @@ function buildSuccessBlocks({ summary, prNumber, actor, actorSlackId, jobUrl, re
|
||||
if (cores !== '0') countsParts.push(`*Cores:* ${cores}`);
|
||||
const countsLine = countsParts.join(' | ');
|
||||
|
||||
const baselineArgs = process.env.BENCH_BASELINE_ARGS || '';
|
||||
const featureArgs = process.env.BENCH_FEATURE_ARGS || '';
|
||||
const argsLines = [];
|
||||
if (baselineArgs) argsLines.push(`*Baseline Args:* \`${baselineArgs}\``);
|
||||
if (featureArgs) argsLines.push(`*Feature Args:* \`${featureArgs}\``);
|
||||
|
||||
const sectionText = [metaParts.join(' | '), '', baselineLine, featureLine, ...argsLines, countsLine].join('\n');
|
||||
const sectionText = [metaParts.join(' | '), '', baselineLine, featureLine, countsLine].join('\n');
|
||||
|
||||
// Action buttons
|
||||
const diffUrl = `https://github.com/${repo}/compare/${summary.baseline.ref}...${summary.feature.ref}`;
|
||||
|
||||
10
.github/workflows/bench.yml
vendored
10
.github/workflows/bench.yml
vendored
@@ -476,7 +476,7 @@ jobs:
|
||||
reth-bench:
|
||||
needs: reth-bench-ack
|
||||
name: reth-bench
|
||||
runs-on: [self-hosted, Linux, X64, available]
|
||||
runs-on: [self-hosted, Linux, X64]
|
||||
timeout-minutes: 120
|
||||
env:
|
||||
BENCH_RPC_URL: https://ethereum.reth.rs/rpc
|
||||
@@ -497,8 +497,6 @@ jobs:
|
||||
BENCH_COMMENT_ID: ${{ needs.reth-bench-ack.outputs.comment-id }}
|
||||
BENCH_NO_SLACK: ${{ needs.reth-bench-ack.outputs.no-slack }}
|
||||
BENCH_METRICS_ADDR: "127.0.0.1:9100"
|
||||
BENCH_OTLP_TRACES_ENDPOINT: ${{ secrets.BENCH_OTLP_TRACES_ENDPOINT }}
|
||||
BENCH_OTLP_LOGS_ENDPOINT: ${{ secrets.BENCH_OTLP_LOGS_ENDPOINT }}
|
||||
steps:
|
||||
- name: Clean up previous bench-work
|
||||
run: sudo rm -rf "$BENCH_WORK_DIR" 2>/dev/null || true
|
||||
@@ -880,7 +878,6 @@ jobs:
|
||||
id: run-baseline-1
|
||||
env:
|
||||
BASELINE_REF: ${{ steps.refs.outputs.baseline-ref }}
|
||||
OTEL_RESOURCE_ATTRIBUTES: "benchmark_id=${{ env.BENCH_ID }},benchmark_run=baseline-1,run_type=baseline,git_ref=${{ steps.refs.outputs.baseline-ref }}"
|
||||
run: |
|
||||
cat > "$BENCH_LABELS_FILE" <<LABELS
|
||||
{"benchmark_run":"baseline-1","run_type":"baseline","git_ref":"${BASELINE_REF}","bench_sha":"${BASELINE_REF}","benchmark_id":"${BENCH_ID}","run_start_epoch":"$(date +%s)","reference_epoch":"${BENCH_REFERENCE_EPOCH}"}
|
||||
@@ -891,7 +888,6 @@ jobs:
|
||||
id: run-feature-1
|
||||
env:
|
||||
FEATURE_REF: ${{ steps.refs.outputs.feature-ref }}
|
||||
OTEL_RESOURCE_ATTRIBUTES: "benchmark_id=${{ env.BENCH_ID }},benchmark_run=feature-1,run_type=feature,git_ref=${{ steps.refs.outputs.feature-ref }}"
|
||||
run: |
|
||||
cat > "$BENCH_LABELS_FILE" <<LABELS
|
||||
{"benchmark_run":"feature-1","run_type":"feature","git_ref":"${FEATURE_REF}","bench_sha":"${FEATURE_REF}","benchmark_id":"${BENCH_ID}","run_start_epoch":"$(date +%s)","reference_epoch":"${BENCH_REFERENCE_EPOCH}"}
|
||||
@@ -899,11 +895,9 @@ jobs:
|
||||
taskset -c 0 .github/scripts/bench-reth-run.sh feature ../reth-feature/target/profiling/reth "$BENCH_WORK_DIR/feature-1"
|
||||
|
||||
- name: "Run benchmark: feature (2/2)"
|
||||
if: env.BENCH_ABBA != 'false'
|
||||
id: run-feature-2
|
||||
env:
|
||||
FEATURE_REF: ${{ steps.refs.outputs.feature-ref }}
|
||||
OTEL_RESOURCE_ATTRIBUTES: "benchmark_id=${{ env.BENCH_ID }},benchmark_run=feature-2,run_type=feature,git_ref=${{ steps.refs.outputs.feature-ref }}"
|
||||
run: |
|
||||
cat > "$BENCH_LABELS_FILE" <<LABELS
|
||||
{"benchmark_run":"feature-2","run_type":"feature","git_ref":"${FEATURE_REF}","bench_sha":"${FEATURE_REF}","benchmark_id":"${BENCH_ID}","run_start_epoch":"$(date +%s)","reference_epoch":"${BENCH_REFERENCE_EPOCH}"}
|
||||
@@ -911,11 +905,9 @@ jobs:
|
||||
taskset -c 0 .github/scripts/bench-reth-run.sh feature ../reth-feature/target/profiling/reth "$BENCH_WORK_DIR/feature-2"
|
||||
|
||||
- name: "Run benchmark: baseline (2/2)"
|
||||
if: env.BENCH_ABBA != 'false'
|
||||
id: run-baseline-2
|
||||
env:
|
||||
BASELINE_REF: ${{ steps.refs.outputs.baseline-ref }}
|
||||
OTEL_RESOURCE_ATTRIBUTES: "benchmark_id=${{ env.BENCH_ID }},benchmark_run=baseline-2,run_type=baseline,git_ref=${{ steps.refs.outputs.baseline-ref }}"
|
||||
run: |
|
||||
LAST_RUN_START=$(date +%s)
|
||||
echo "BENCH_LAST_RUN_START=${LAST_RUN_START}" >> "$GITHUB_ENV"
|
||||
|
||||
2
.github/workflows/docker-test.yml
vendored
2
.github/workflows/docker-test.yml
vendored
@@ -6,7 +6,7 @@ on:
|
||||
hive_target:
|
||||
required: true
|
||||
type: string
|
||||
description: "Docker bake target to build (e.g. hive)"
|
||||
description: "Docker bake target to build (e.g. hive-stable, hive-edge)"
|
||||
artifact_name:
|
||||
required: false
|
||||
type: string
|
||||
|
||||
30
.github/workflows/docker.yml
vendored
30
.github/workflows/docker.yml
vendored
@@ -28,11 +28,6 @@ on:
|
||||
required: false
|
||||
type: boolean
|
||||
default: false
|
||||
pgo:
|
||||
description: "Enable PGO profiling"
|
||||
required: false
|
||||
type: boolean
|
||||
default: false
|
||||
pgo_blocks:
|
||||
description: "Number of blocks to execute for PGO profiling"
|
||||
required: false
|
||||
@@ -41,14 +36,14 @@ on:
|
||||
|
||||
jobs:
|
||||
collect-pgo-profile:
|
||||
if: github.repository == 'paradigmxyz/reth' && github.event_name == 'workflow_dispatch' && inputs.pgo
|
||||
if: github.repository == 'paradigmxyz/reth'
|
||||
uses: ./.github/workflows/pgo-profile.yml
|
||||
with:
|
||||
pgo_blocks: ${{ inputs.pgo_blocks || '20' }}
|
||||
pgo_blocks: ${{ github.event_name == 'workflow_dispatch' && inputs.pgo_blocks || '20' }}
|
||||
secrets: inherit
|
||||
|
||||
build:
|
||||
if: github.repository == 'paradigmxyz/reth' && !failure() && !cancelled()
|
||||
if: github.repository == 'paradigmxyz/reth'
|
||||
name: Build Docker images
|
||||
runs-on: ubuntu-24.04
|
||||
needs: collect-pgo-profile
|
||||
@@ -77,7 +72,6 @@ jobs:
|
||||
echo "dirty=false" >> "$GITHUB_OUTPUT"
|
||||
|
||||
- name: Download pre-collected PGO profile
|
||||
if: ${{ github.event_name == 'workflow_dispatch' && inputs.pgo }}
|
||||
uses: actions/download-artifact@v7
|
||||
with:
|
||||
name: pgo-profdata
|
||||
@@ -86,19 +80,13 @@ jobs:
|
||||
- name: Configure PGO build args
|
||||
id: pgo
|
||||
run: |
|
||||
if [[ "${{ github.event_name }}" == "workflow_dispatch" ]] && [[ "${{ inputs.pgo }}" == "true" ]]; then
|
||||
if [ ! -f dist/merged.profdata ]; then
|
||||
echo "::error::Expected dist/merged.profdata from collect-pgo-profile job"
|
||||
exit 1
|
||||
fi
|
||||
echo "use_pgo_bolt=true" >> "$GITHUB_OUTPUT"
|
||||
echo "pgo_profdata=dist/merged.profdata" >> "$GITHUB_OUTPUT"
|
||||
echo "Using pre-collected PGO profile from collect-pgo-profile job"
|
||||
else
|
||||
echo "use_pgo_bolt=false" >> "$GITHUB_OUTPUT"
|
||||
echo "pgo_profdata=" >> "$GITHUB_OUTPUT"
|
||||
echo "PGO disabled"
|
||||
if [ ! -f dist/merged.profdata ]; then
|
||||
echo "::error::Expected dist/merged.profdata from collect-pgo-profile job"
|
||||
exit 1
|
||||
fi
|
||||
echo "use_pgo_bolt=true" >> "$GITHUB_OUTPUT"
|
||||
echo "pgo_profdata=dist/merged.profdata" >> "$GITHUB_OUTPUT"
|
||||
echo "Using pre-collected PGO profile from collect-pgo-profile job"
|
||||
|
||||
- name: Determine build parameters
|
||||
id: params
|
||||
|
||||
2
.github/workflows/e2e.yml
vendored
2
.github/workflows/e2e.yml
vendored
@@ -63,6 +63,6 @@ jobs:
|
||||
run: |
|
||||
cargo nextest run \
|
||||
--no-fail-fast \
|
||||
--locked \
|
||||
--locked --features "edge" \
|
||||
-p reth-e2e-test-utils \
|
||||
-E 'binary(rocksdb)'
|
||||
|
||||
21
.github/workflows/hive.yml
vendored
21
.github/workflows/hive.yml
vendored
@@ -15,11 +15,18 @@ concurrency:
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
build-reth:
|
||||
build-reth-stable:
|
||||
uses: ./.github/workflows/docker-test.yml
|
||||
with:
|
||||
hive_target: hive
|
||||
artifact_name: "reth"
|
||||
hive_target: hive-stable
|
||||
artifact_name: "reth-stable"
|
||||
secrets: inherit
|
||||
|
||||
build-reth-edge:
|
||||
uses: ./.github/workflows/docker-test.yml
|
||||
with:
|
||||
hive_target: hive-edge
|
||||
artifact_name: "reth-edge"
|
||||
secrets: inherit
|
||||
|
||||
prepare-hive:
|
||||
@@ -77,6 +84,7 @@ jobs:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
storage: [stable, edge]
|
||||
# ethereum/rpc to be deprecated:
|
||||
# https://github.com/ethereum/hive/pull/1117
|
||||
scenario:
|
||||
@@ -176,9 +184,10 @@ jobs:
|
||||
- sim: ethereum/eels/consume-rlp
|
||||
limit: .*tests/paris.*
|
||||
needs:
|
||||
- build-reth
|
||||
- build-reth-stable
|
||||
- build-reth-edge
|
||||
- prepare-hive
|
||||
name: ${{ matrix.scenario.sim }}${{ matrix.scenario.limit && format(' - {0}', matrix.scenario.limit) }}
|
||||
name: ${{ matrix.storage }} / ${{ matrix.scenario.sim }}${{ matrix.scenario.limit && format(' - {0}', matrix.scenario.limit) }}
|
||||
# Use larger runners for eels tests to avoid OOM runner crashes
|
||||
runs-on: ${{ github.repository == 'paradigmxyz/reth' && (contains(matrix.scenario.sim, 'eels') && 'depot-ubuntu-latest-8' || 'depot-ubuntu-latest-4') || 'ubuntu-latest' }}
|
||||
permissions:
|
||||
@@ -197,7 +206,7 @@ jobs:
|
||||
- name: Download reth image
|
||||
uses: actions/download-artifact@v8
|
||||
with:
|
||||
name: reth
|
||||
name: reth-${{ matrix.storage }}
|
||||
path: /tmp
|
||||
|
||||
- name: Load Docker images
|
||||
|
||||
5
.github/workflows/integration.yml
vendored
5
.github/workflows/integration.yml
vendored
@@ -22,7 +22,7 @@ concurrency:
|
||||
|
||||
jobs:
|
||||
test:
|
||||
name: test / ${{ matrix.network }}
|
||||
name: test / ${{ matrix.network }} / ${{ matrix.storage }}
|
||||
if: github.event_name != 'schedule'
|
||||
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
|
||||
env:
|
||||
@@ -30,6 +30,7 @@ jobs:
|
||||
strategy:
|
||||
matrix:
|
||||
network: ["ethereum"]
|
||||
storage: ["stable", "edge"]
|
||||
timeout-minutes: 60
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
@@ -46,7 +47,7 @@ jobs:
|
||||
run: |
|
||||
cargo nextest run \
|
||||
--no-fail-fast \
|
||||
--locked --features "asm-keccak ${{ matrix.network }}" \
|
||||
--locked --features "asm-keccak ${{ matrix.network }} ${{ matrix.storage == 'edge' && 'edge' || '' }}" \
|
||||
--workspace --exclude ef-tests \
|
||||
-E "kind(test) and not binary(e2e_testsuite)"
|
||||
|
||||
|
||||
10
.github/workflows/release.yml
vendored
10
.github/workflows/release.yml
vendored
@@ -13,10 +13,6 @@ on:
|
||||
description: "Enable dry run mode (builds artifacts but skips uploads and release creation)"
|
||||
type: boolean
|
||||
default: false
|
||||
pgo:
|
||||
description: "Enable PGO profiling"
|
||||
type: boolean
|
||||
default: false
|
||||
pgo_blocks:
|
||||
description: "Number of blocks to execute for PGO profiling on self-hosted runner"
|
||||
type: string
|
||||
@@ -158,14 +154,12 @@ jobs:
|
||||
path: ${{ matrix.build.binary }}-${{ needs.extract-version.outputs.VERSION }}-${{ matrix.configs.target }}.tar.gz.asc
|
||||
|
||||
collect-pgo-profile:
|
||||
if: github.event_name == 'workflow_dispatch' && inputs.pgo
|
||||
uses: ./.github/workflows/pgo-profile.yml
|
||||
with:
|
||||
pgo_blocks: ${{ inputs.pgo_blocks || '20' }}
|
||||
pgo_blocks: ${{ github.event_name == 'workflow_dispatch' && inputs.pgo_blocks || '20' }}
|
||||
secrets: inherit
|
||||
|
||||
build-pgo:
|
||||
if: github.event_name == 'workflow_dispatch' && inputs.pgo
|
||||
name: build release (x86_64-linux PGO+BOLT)
|
||||
runs-on: [self-hosted, Linux, X64]
|
||||
needs: [extract-version, collect-pgo-profile]
|
||||
@@ -243,7 +237,7 @@ jobs:
|
||||
name: draft release
|
||||
runs-on: ubuntu-latest
|
||||
needs: [build, build-pgo, extract-version]
|
||||
if: ${{ !failure() && !cancelled() && github.event.inputs.dry_run != 'true' }}
|
||||
if: ${{ github.event.inputs.dry_run != 'true' }}
|
||||
env:
|
||||
VERSION: ${{ needs.extract-version.outputs.VERSION }}
|
||||
permissions:
|
||||
|
||||
8
.github/workflows/unit.yml
vendored
8
.github/workflows/unit.yml
vendored
@@ -19,13 +19,15 @@ concurrency:
|
||||
|
||||
jobs:
|
||||
test:
|
||||
name: test / ${{ matrix.type }}
|
||||
name: test / ${{ matrix.type }} / ${{ matrix.storage }}
|
||||
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
|
||||
env:
|
||||
RUST_BACKTRACE: 1
|
||||
EDGE_FEATURES: ${{ matrix.storage == 'edge' && 'edge' || '' }}
|
||||
strategy:
|
||||
matrix:
|
||||
type: [ethereum]
|
||||
storage: [stable, edge]
|
||||
include:
|
||||
- type: ethereum
|
||||
features: asm-keccak ethereum
|
||||
@@ -48,14 +50,14 @@ jobs:
|
||||
run: |
|
||||
cargo nextest run \
|
||||
--no-fail-fast \
|
||||
--features "${{ matrix.features }}" --locked \
|
||||
--features "${{ matrix.features }} $EDGE_FEATURES" --locked \
|
||||
${{ matrix.exclude_args }} --workspace \
|
||||
--exclude ef-tests --no-tests=warn \
|
||||
-E "!kind(test) and not binary(e2e_testsuite)"
|
||||
|
||||
state:
|
||||
name: Ethereum state tests
|
||||
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-8' || 'ubuntu-latest' }}
|
||||
runs-on: ${{ github.repository == 'paradigmxyz/reth' && 'depot-ubuntu-latest-4' || 'ubuntu-latest' }}
|
||||
env:
|
||||
RUST_LOG: info,sync=error
|
||||
RUST_BACKTRACE: 1
|
||||
|
||||
158
Cargo.lock
generated
158
Cargo.lock
generated
@@ -1003,9 +1003,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299"
|
||||
|
||||
[[package]]
|
||||
name = "anstream"
|
||||
version = "1.0.0"
|
||||
version = "0.6.21"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "824a212faf96e9acacdbd09febd34438f8f711fb84e09a8916013cd7815ca28d"
|
||||
checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a"
|
||||
dependencies = [
|
||||
"anstyle",
|
||||
"anstyle-parse",
|
||||
@@ -1018,15 +1018,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "anstyle"
|
||||
version = "1.0.14"
|
||||
version = "1.0.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000"
|
||||
checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78"
|
||||
|
||||
[[package]]
|
||||
name = "anstyle-parse"
|
||||
version = "1.0.0"
|
||||
version = "0.2.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "52ce7f38b242319f7cabaa6813055467063ecdc9d355bbb4ce0c68908cd8130e"
|
||||
checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2"
|
||||
dependencies = [
|
||||
"utf8parse",
|
||||
]
|
||||
@@ -2026,9 +2026,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "c-kzg"
|
||||
version = "2.1.7"
|
||||
version = "2.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6648ed1e4ea8e8a1a4a2c78e1cda29a3fd500bc622899c340d8525ea9a76b24a"
|
||||
checksum = "1a0f582957c24870b7bfd12bf562c40b4734b533cafbaf8ded31d6d85f462c01"
|
||||
dependencies = [
|
||||
"arbitrary",
|
||||
"blst",
|
||||
@@ -2113,9 +2113,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cc"
|
||||
version = "1.2.57"
|
||||
version = "1.2.56"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7a0dd1ca384932ff3641c8718a02769f1698e7563dc6974ffd03346116310423"
|
||||
checksum = "aebf35691d1bfb0ac386a69bac2fde4dd276fb618cf8bf4f5318fe285e821bb2"
|
||||
dependencies = [
|
||||
"find-msvc-tools",
|
||||
"jobserver",
|
||||
@@ -2214,9 +2214,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "clap"
|
||||
version = "4.6.0"
|
||||
version = "4.5.60"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b193af5b67834b676abd72466a96c1024e6a6ad978a1f484bd90b85c94041351"
|
||||
checksum = "2797f34da339ce31042b27d23607e051786132987f595b02ba4f6a6dffb7030a"
|
||||
dependencies = [
|
||||
"clap_builder",
|
||||
"clap_derive",
|
||||
@@ -2224,9 +2224,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "clap_builder"
|
||||
version = "4.6.0"
|
||||
version = "4.5.60"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f"
|
||||
checksum = "24a241312cea5059b13574bb9b3861cabf758b879c15190b37b6d6fd63ab6876"
|
||||
dependencies = [
|
||||
"anstream",
|
||||
"anstyle",
|
||||
@@ -2236,9 +2236,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "clap_derive"
|
||||
version = "4.6.0"
|
||||
version = "4.5.55"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1110bd8a634a1ab8cb04345d8d878267d57c3cf1b38d91b71af6686408bbca6a"
|
||||
checksum = "a92793da1a46a5f2a02a6f4c46c6496b28c43638adea8306fcb0caa1634f24e5"
|
||||
dependencies = [
|
||||
"heck",
|
||||
"proc-macro2",
|
||||
@@ -2248,9 +2248,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "clap_lex"
|
||||
version = "1.1.0"
|
||||
version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9"
|
||||
checksum = "3a822ea5bc7590f9d40f1ba12c0dc3c2760f3482c6984db1573ad11031420831"
|
||||
|
||||
[[package]]
|
||||
name = "cmake"
|
||||
@@ -2272,9 +2272,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "codspeed"
|
||||
version = "4.4.1"
|
||||
version = "4.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b684e94583e85a5ca7e1a6454a89d76a5121240f2fb67eb564129d9bafdb9db0"
|
||||
checksum = "38c2eb3388ebe26b5a0ab6bf4969d9c4840143d7f6df07caa3cc851b0606cef6"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"cc",
|
||||
@@ -2282,7 +2282,7 @@ dependencies = [
|
||||
"getrandom 0.2.17",
|
||||
"glob",
|
||||
"libc",
|
||||
"nix",
|
||||
"nix 0.30.1",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"statrs",
|
||||
@@ -2290,9 +2290,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "codspeed-criterion-compat"
|
||||
version = "4.4.1"
|
||||
version = "4.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2e65444156eb73ad7f57618188f8d4a281726d133ef55b96d1dcff89528609ab"
|
||||
checksum = "e1e270597a1d1e183f86d1cc9f94f0133654ee3daf201c17903ee29363555dd7"
|
||||
dependencies = [
|
||||
"clap",
|
||||
"codspeed",
|
||||
@@ -2303,9 +2303,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "codspeed-criterion-compat-walltime"
|
||||
version = "4.4.1"
|
||||
version = "4.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "96389aaa4bbb872ea4924dc0335b2bb181bcf28d6eedbe8fea29afcc5bde36a6"
|
||||
checksum = "e6c2613d2fac930fe34456be76f9124ee0800bb9db2e7fd2d6c65b9ebe98a292"
|
||||
dependencies = [
|
||||
"anes",
|
||||
"cast",
|
||||
@@ -2408,9 +2408,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "colorchoice"
|
||||
version = "1.0.5"
|
||||
version = "1.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570"
|
||||
checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
|
||||
|
||||
[[package]]
|
||||
name = "colored"
|
||||
@@ -2782,7 +2782,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e0b1fab2ae45819af2d0731d60f2afe17227ebb1a1538a236da84c93e9a60162"
|
||||
dependencies = [
|
||||
"dispatch2",
|
||||
"nix",
|
||||
"nix 0.31.2",
|
||||
"windows-sys 0.61.2",
|
||||
]
|
||||
|
||||
@@ -3023,9 +3023,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "derive-where"
|
||||
version = "1.6.1"
|
||||
version = "1.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d08b3a0bcc0d079199cd476b2cae8435016ec11d1c0986c6901c5ac223041534"
|
||||
checksum = "ef941ded77d15ca19b40374869ac6000af1c9f2a4c0f3d4c70926287e6364a8f"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -5500,9 +5500,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "kasuari"
|
||||
version = "0.4.12"
|
||||
version = "0.4.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bde5057d6143cc94e861d90f591b9303d6716c6b9602309150bd068853c10899"
|
||||
checksum = "8fe90c1150662e858c7d5f945089b7517b0a80d8bf7ba4b1b5ffc984e7230a5b"
|
||||
dependencies = [
|
||||
"hashbrown 0.16.1",
|
||||
"portable-atomic",
|
||||
@@ -5654,9 +5654,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libz-sys"
|
||||
version = "1.1.25"
|
||||
version = "1.1.24"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d52f4c29e2a68ac30c9087e1b772dc9f44a2b66ed44edf2266cf2be9b03dafc1"
|
||||
checksum = "4735e9cbde5aac84a5ce588f6b23a90b9b0b528f6c5a8db8a4aff300463a0839"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
@@ -5733,7 +5733,7 @@ dependencies = [
|
||||
"generator",
|
||||
"scoped-tls",
|
||||
"tracing",
|
||||
"tracing-subscriber 0.3.23",
|
||||
"tracing-subscriber 0.3.22",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5781,9 +5781,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lz4_flex"
|
||||
version = "0.12.1"
|
||||
version = "0.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "98c23545df7ecf1b16c303910a69b079e8e251d60f7dd2cc9b4177f2afaf1746"
|
||||
checksum = "ab6473172471198271ff72e9379150e9dfd70d8e533e0752a27e515b48dd375e"
|
||||
|
||||
[[package]]
|
||||
name = "mach2"
|
||||
@@ -6086,6 +6086,18 @@ dependencies = [
|
||||
"unsigned-varint",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.30.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6"
|
||||
dependencies = [
|
||||
"bitflags 2.11.0",
|
||||
"cfg-if",
|
||||
"cfg_aliases",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.31.2"
|
||||
@@ -6301,9 +6313,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "objc2-core-foundation"
|
||||
version = "0.3.2"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2a180dd8642fa45cdb7dd721cd4c11b1cadd4929ce112ebd8b9f5803cc79d536"
|
||||
checksum = "1c10c2894a6fed806ade6027bcd50662746363a9589d3ec9d9bef30a4e4bc166"
|
||||
dependencies = [
|
||||
"bitflags 2.11.0",
|
||||
]
|
||||
@@ -6316,9 +6328,9 @@ checksum = "ef25abbcd74fb2609453eb695bd2f860d389e457f67dc17cafc8b8cbc89d0c33"
|
||||
|
||||
[[package]]
|
||||
name = "objc2-io-kit"
|
||||
version = "0.3.2"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "33fafba39597d6dc1fb709123dfa8289d39406734be322956a69f0931c73bb15"
|
||||
checksum = "71c1c64d6120e51cd86033f67176b1cb66780c2efe34dec55176f77befd93c0a"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"objc2-core-foundation",
|
||||
@@ -6335,9 +6347,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "once_cell"
|
||||
version = "1.21.4"
|
||||
version = "1.21.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50"
|
||||
checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
|
||||
dependencies = [
|
||||
"critical-section",
|
||||
"portable-atomic",
|
||||
@@ -6506,7 +6518,7 @@ dependencies = [
|
||||
"opentelemetry",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
"tracing-subscriber 0.3.23",
|
||||
"tracing-subscriber 0.3.22",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -7709,7 +7721,7 @@ dependencies = [
|
||||
"csv",
|
||||
"ctrlc",
|
||||
"eyre",
|
||||
"nix",
|
||||
"nix 0.31.2",
|
||||
"reth-chainspec",
|
||||
"reth-cli-runner",
|
||||
"reth-cli-util",
|
||||
@@ -10422,7 +10434,7 @@ dependencies = [
|
||||
"tracing-journald",
|
||||
"tracing-logfmt",
|
||||
"tracing-samply",
|
||||
"tracing-subscriber 0.3.23",
|
||||
"tracing-subscriber 0.3.22",
|
||||
"tracing-tracy",
|
||||
"tracy-client",
|
||||
]
|
||||
@@ -10440,7 +10452,7 @@ dependencies = [
|
||||
"opentelemetry_sdk",
|
||||
"tracing",
|
||||
"tracing-opentelemetry",
|
||||
"tracing-subscriber 0.3.23",
|
||||
"tracing-subscriber 0.3.22",
|
||||
"url",
|
||||
]
|
||||
|
||||
@@ -11223,9 +11235,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "schannel"
|
||||
version = "0.1.29"
|
||||
version = "0.1.28"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "91c1b7e4904c873ef0710c1f407dde2e6287de2bebc1bbbf7d430bb7cbffd939"
|
||||
checksum = "891d81b926048e76efe18581bf793546b4c0eaf8448d72be8de2bbee5fd166e1"
|
||||
dependencies = [
|
||||
"windows-sys 0.61.2",
|
||||
]
|
||||
@@ -11498,9 +11510,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "serde_with"
|
||||
version = "3.18.0"
|
||||
version = "3.17.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dd5414fad8e6907dbdd5bc441a50ae8d6e26151a03b1de04d89a5576de61d01f"
|
||||
checksum = "381b283ce7bc6b476d903296fb59d0d36633652b633b27f64db4fb46dcbfc3b9"
|
||||
dependencies = [
|
||||
"base64 0.22.1",
|
||||
"chrono",
|
||||
@@ -11517,11 +11529,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "serde_with_macros"
|
||||
version = "3.18.0"
|
||||
version = "3.17.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d3db8978e608f1fe7357e211969fd9abdcae80bac1ba7a3369bb7eb6b404eb65"
|
||||
checksum = "a6d4e30573c8cb306ed6ab1dca8423eec9a463ea0e155f45399455e0368b27e0"
|
||||
dependencies = [
|
||||
"darling 0.23.0",
|
||||
"darling 0.21.3",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.117",
|
||||
@@ -11910,9 +11922,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sysinfo"
|
||||
version = "0.38.4"
|
||||
version = "0.38.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "92ab6a2f8bfe508deb3c6406578252e491d299cbbf3bc0529ecc3313aee4a52f"
|
||||
checksum = "d03c61d2a49c649a15c407338afe7accafde9dac869995dccb73e5f7ef7d9034"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"memchr",
|
||||
@@ -11953,9 +11965,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tempfile"
|
||||
version = "3.27.0"
|
||||
version = "3.26.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd"
|
||||
checksum = "82a72c767771b47409d2345987fda8628641887d5466101319899796367354a0"
|
||||
dependencies = [
|
||||
"fastrand 2.3.0",
|
||||
"getrandom 0.4.2",
|
||||
@@ -12215,9 +12227,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tinyvec"
|
||||
version = "1.11.0"
|
||||
version = "1.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3e61e67053d25a4e82c844e8424039d9745781b3fc4f32b8d55ed50f5f667ef3"
|
||||
checksum = "bfa5fdc3bce6191a1dbc8c02d5c8bffcf557bafa17c124c5264a458f1b0613fa"
|
||||
dependencies = [
|
||||
"tinyvec_macros",
|
||||
]
|
||||
@@ -12502,7 +12514,7 @@ dependencies = [
|
||||
"crossbeam-channel",
|
||||
"thiserror 2.0.18",
|
||||
"time",
|
||||
"tracing-subscriber 0.3.23",
|
||||
"tracing-subscriber 0.3.22",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -12533,7 +12545,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b1581020d7a273442f5b45074a6a57d5757ad0a47dac0e9f0bd57b81936f3db"
|
||||
dependencies = [
|
||||
"tracing",
|
||||
"tracing-subscriber 0.3.23",
|
||||
"tracing-subscriber 0.3.22",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -12554,7 +12566,7 @@ checksum = "2d3a81ed245bfb62592b1e2bc153e77656d94ee6a0497683a65a12ccaf2438d0"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"tracing-core",
|
||||
"tracing-subscriber 0.3.23",
|
||||
"tracing-subscriber 0.3.22",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -12577,7 +12589,7 @@ dependencies = [
|
||||
"time",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
"tracing-subscriber 0.3.23",
|
||||
"tracing-subscriber 0.3.22",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -12592,7 +12604,7 @@ dependencies = [
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
"tracing-log",
|
||||
"tracing-subscriber 0.3.23",
|
||||
"tracing-subscriber 0.3.22",
|
||||
"web-time",
|
||||
]
|
||||
|
||||
@@ -12609,7 +12621,7 @@ dependencies = [
|
||||
"memmap2",
|
||||
"smallvec",
|
||||
"tracing-core",
|
||||
"tracing-subscriber 0.3.23",
|
||||
"tracing-subscriber 0.3.22",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -12633,9 +12645,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tracing-subscriber"
|
||||
version = "0.3.23"
|
||||
version = "0.3.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319"
|
||||
checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e"
|
||||
dependencies = [
|
||||
"matchers",
|
||||
"nu-ansi-term",
|
||||
@@ -12659,7 +12671,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0eaa1852afa96e0fe9e44caa53dc0bd2d9d05e0f2611ce09f97f8677af56e4ba"
|
||||
dependencies = [
|
||||
"tracing-core",
|
||||
"tracing-subscriber 0.3.23",
|
||||
"tracing-subscriber 0.3.22",
|
||||
"tracy-client",
|
||||
]
|
||||
|
||||
@@ -13958,18 +13970,18 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "zerocopy"
|
||||
version = "0.8.42"
|
||||
version = "0.8.41"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f2578b716f8a7a858b7f02d5bd870c14bf4ddbbcf3a4c05414ba6503640505e3"
|
||||
checksum = "96e13bc581734df6250836c59a5f44f3c57db9f9acb9dc8e3eaabdaf6170254d"
|
||||
dependencies = [
|
||||
"zerocopy-derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zerocopy-derive"
|
||||
version = "0.8.42"
|
||||
version = "0.8.41"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7e6cc098ea4d3bd6246687de65af3f920c430e236bee1e3bf2e441463f08a02f"
|
||||
checksum = "3545ea9e86d12ab9bba9fcd99b54c1556fd3199007def5a03c375623d05fac1c"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
|
||||
@@ -89,6 +89,7 @@ default = [
|
||||
"keccak-cache-global",
|
||||
"asm-keccak",
|
||||
"min-debug-logs",
|
||||
"rocksdb",
|
||||
]
|
||||
|
||||
otlp = [
|
||||
@@ -190,6 +191,8 @@ min-trace-logs = [
|
||||
]
|
||||
|
||||
trie-debug = ["reth-node-builder/trie-debug", "reth-node-core/trie-debug"]
|
||||
rocksdb = ["reth-ethereum-cli/rocksdb", "reth-node-core/rocksdb"]
|
||||
edge = ["rocksdb"]
|
||||
|
||||
[[bin]]
|
||||
name = "reth"
|
||||
|
||||
@@ -992,7 +992,7 @@ impl<N: NodePrimitives<SignedTx: SignedTransaction>> NewCanonicalChain<N> {
|
||||
///
|
||||
/// Returns the new tip for [`Self::Reorg`] and [`Self::Commit`] variants which commit at least
|
||||
/// 1 new block.
|
||||
pub fn tip(&self) -> &RecoveredBlock<N::Block> {
|
||||
pub fn tip(&self) -> &SealedBlock<N::Block> {
|
||||
match self {
|
||||
Self::Commit { new } | Self::Reorg { new, .. } => {
|
||||
new.last().expect("non empty blocks").recovered_block()
|
||||
|
||||
@@ -110,6 +110,7 @@ reth-provider = { workspace = true, features = ["test-utils"] }
|
||||
tempfile.workspace = true
|
||||
|
||||
[features]
|
||||
default = []
|
||||
arbitrary = [
|
||||
"dep:proptest",
|
||||
"dep:arbitrary",
|
||||
@@ -134,3 +135,6 @@ arbitrary = [
|
||||
"reth-primitives-traits/arbitrary",
|
||||
"reth-ethereum-primitives/arbitrary",
|
||||
]
|
||||
|
||||
rocksdb = ["reth-db-common/rocksdb", "reth-stages/rocksdb", "reth-provider/rocksdb", "reth-prune/rocksdb"]
|
||||
edge = ["rocksdb"]
|
||||
|
||||
@@ -21,6 +21,7 @@ use std::{
|
||||
};
|
||||
use tracing::{info, warn};
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
mod rocksdb;
|
||||
|
||||
/// Interval for logging progress during checksum computation.
|
||||
@@ -72,6 +73,7 @@ enum Subcommand {
|
||||
limit: Option<usize>,
|
||||
},
|
||||
/// Calculates the checksum of a RocksDB table
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Rocksdb {
|
||||
/// The RocksDB table
|
||||
#[arg(value_enum)]
|
||||
@@ -98,6 +100,7 @@ impl Command {
|
||||
Subcommand::StaticFile { segment, start_block, end_block, limit } => {
|
||||
checksum_static_file(tool, segment, start_block, end_block, limit)?;
|
||||
}
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Subcommand::Rocksdb { table, limit } => {
|
||||
rocksdb::checksum_rocksdb(tool, table, limit)?;
|
||||
}
|
||||
|
||||
@@ -19,12 +19,11 @@ use reth_node_api::BlockTy;
|
||||
use reth_node_events::node::NodeEvent;
|
||||
use reth_provider::{
|
||||
providers::ProviderNodeTypes, BlockNumReader, HeaderProvider, ProviderError, ProviderFactory,
|
||||
RocksDBProviderFactory, StageCheckpointReader,
|
||||
StageCheckpointReader,
|
||||
};
|
||||
use reth_prune::PruneModes;
|
||||
use reth_stages::{prelude::*, ControlFlow, Pipeline, StageId, StageSet};
|
||||
use reth_static_file::StaticFileProducer;
|
||||
use reth_storage_api::StorageSettingsCache;
|
||||
use std::{path::Path, sync::Arc};
|
||||
use tokio::sync::watch;
|
||||
use tracing::{debug, error, info, warn};
|
||||
@@ -109,11 +108,7 @@ where
|
||||
|
||||
let provider = provider_factory.provider()?;
|
||||
let init_blocks = provider.tx_ref().entries::<tables::HeaderNumbers>()?;
|
||||
let init_txns = if provider_factory.cached_storage_settings().storage_v2 {
|
||||
provider_factory.rocksdb_provider().iter::<tables::TransactionHashNumbers>()?.count()
|
||||
} else {
|
||||
provider.tx_ref().entries::<tables::TransactionHashNumbers>()?
|
||||
};
|
||||
let init_txns = provider.tx_ref().entries::<tables::TransactionHashNumbers>()?;
|
||||
drop(provider);
|
||||
|
||||
let mut total_decoded_blocks = 0;
|
||||
@@ -220,12 +215,8 @@ where
|
||||
|
||||
let provider = provider_factory.provider()?;
|
||||
let total_imported_blocks = provider.tx_ref().entries::<tables::HeaderNumbers>()? - init_blocks;
|
||||
let current_txns = if provider_factory.cached_storage_settings().storage_v2 {
|
||||
provider_factory.rocksdb_provider().iter::<tables::TransactionHashNumbers>()?.count()
|
||||
} else {
|
||||
provider.tx_ref().entries::<tables::TransactionHashNumbers>()?
|
||||
};
|
||||
let total_imported_txns = current_txns - init_txns;
|
||||
let total_imported_txns =
|
||||
provider.tx_ref().entries::<tables::TransactionHashNumbers>()? - init_txns;
|
||||
|
||||
let result = ImportResult {
|
||||
total_decoded_blocks,
|
||||
|
||||
@@ -193,10 +193,7 @@ impl<C: ChainSpecParser> DownloadArgs<C> {
|
||||
let default_secret_key_path = data_dir.p2p_secret();
|
||||
let p2p_secret_key = self.network.secret_key(default_secret_key_path)?;
|
||||
let rlpx_socket = (self.network.addr, self.network.port).into();
|
||||
let boot_nodes = self
|
||||
.network
|
||||
.resolved_bootnodes()
|
||||
.unwrap_or_else(|| self.chain.bootnodes().unwrap_or_default());
|
||||
let boot_nodes = self.chain.bootnodes().unwrap_or_default();
|
||||
|
||||
let net =
|
||||
NetworkConfigBuilder::<N::NetworkPrimitives>::new(p2p_secret_key, Runtime::test())
|
||||
|
||||
@@ -12,6 +12,7 @@ use reth_node_metrics::{
|
||||
server::{MetricServer, MetricServerConfig},
|
||||
version::VersionInfo,
|
||||
};
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
use reth_provider::RocksDBProviderFactory;
|
||||
use reth_prune::PrunerBuilder;
|
||||
use reth_static_file::StaticFileProducer;
|
||||
@@ -121,6 +122,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> PruneComma
|
||||
}
|
||||
|
||||
// Flush and compact RocksDB to reclaim disk space after pruning
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
{
|
||||
info!(target: "reth::cli", "Flushing and compacting RocksDB...");
|
||||
provider_factory.rocksdb_provider().flush_and_compact()?;
|
||||
|
||||
@@ -75,3 +75,8 @@ path = "tests/e2e-testsuite/main.rs"
|
||||
[[test]]
|
||||
name = "rocksdb"
|
||||
path = "tests/rocksdb/main.rs"
|
||||
required-features = ["rocksdb"]
|
||||
|
||||
[features]
|
||||
rocksdb = ["reth-node-core/rocksdb", "reth-provider/rocksdb", "reth-cli-commands/rocksdb"]
|
||||
edge = ["rocksdb"]
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
//! E2E tests for `RocksDB` provider functionality.
|
||||
|
||||
#![cfg(all(feature = "rocksdb", unix))]
|
||||
|
||||
use alloy_consensus::BlockHeader;
|
||||
use alloy_primitives::B256;
|
||||
use alloy_rpc_types_eth::{Transaction, TransactionReceipt};
|
||||
|
||||
@@ -139,6 +139,13 @@ trie-debug = [
|
||||
"reth-engine-primitives/trie-debug",
|
||||
"dep:serde_json",
|
||||
]
|
||||
rocksdb = [
|
||||
"reth-provider/rocksdb",
|
||||
"reth-prune/rocksdb",
|
||||
"reth-stages?/rocksdb",
|
||||
"reth-e2e-test-utils/rocksdb",
|
||||
]
|
||||
edge = ["rocksdb"]
|
||||
|
||||
[[test]]
|
||||
name = "e2e_testsuite"
|
||||
|
||||
@@ -38,7 +38,10 @@ use reth_trie_parallel::{
|
||||
proof_task::{ProofTaskCtx, ProofWorkerHandle},
|
||||
root::ParallelStateRootError,
|
||||
};
|
||||
use reth_trie_sparse::ParallelismThresholds;
|
||||
use reth_trie_sparse::{
|
||||
ArenaParallelSparseTrie, ConfigurableSparseTrie, ParallelSparseTrie, ParallelismThresholds,
|
||||
RevealableSparseTrie, SparseStateTrie,
|
||||
};
|
||||
use std::{
|
||||
ops::Not,
|
||||
sync::{
|
||||
@@ -57,10 +60,7 @@ pub mod prewarm;
|
||||
pub mod receipt_root_task;
|
||||
pub mod sparse_trie;
|
||||
|
||||
pub use preserved_sparse_trie::{
|
||||
PayloadSparseTrieCache, PayloadSparseTrieKind, PayloadSparseTrieStoreOutcome,
|
||||
SparseTrieCheckout,
|
||||
};
|
||||
use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
|
||||
|
||||
/// Default parallelism thresholds to use with the [`ParallelSparseTrie`].
|
||||
///
|
||||
@@ -104,52 +104,6 @@ type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
|
||||
<N as NodePrimitives>::Receipt,
|
||||
>;
|
||||
|
||||
/// Shared cache handles that can be exported to engine consumers and downstream payload builders.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct EngineSharedCaches<Evm: ConfigureEvm> {
|
||||
execution_cache: PayloadExecutionCache,
|
||||
sparse_trie_cache: PayloadSparseTrieCache,
|
||||
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
|
||||
}
|
||||
|
||||
impl<Evm> Default for EngineSharedCaches<Evm>
|
||||
where
|
||||
Evm: ConfigureEvm,
|
||||
{
|
||||
fn default() -> Self {
|
||||
Self::with_sparse_trie_kind(PayloadSparseTrieKind::default())
|
||||
}
|
||||
}
|
||||
|
||||
impl<Evm> EngineSharedCaches<Evm>
|
||||
where
|
||||
Evm: ConfigureEvm,
|
||||
{
|
||||
/// Creates shared caches backed by the requested sparse trie implementation.
|
||||
pub fn with_sparse_trie_kind(sparse_trie_kind: PayloadSparseTrieKind) -> Self {
|
||||
Self {
|
||||
execution_cache: Default::default(),
|
||||
sparse_trie_cache: PayloadSparseTrieCache::new(sparse_trie_kind),
|
||||
precompile_cache_map: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the shared execution cache handle for engine-internal use.
|
||||
pub(crate) fn execution_cache(&self) -> PayloadExecutionCache {
|
||||
self.execution_cache.clone()
|
||||
}
|
||||
|
||||
/// Returns the shared sparse trie cache handle.
|
||||
pub fn sparse_trie_cache(&self) -> PayloadSparseTrieCache {
|
||||
self.sparse_trie_cache.clone()
|
||||
}
|
||||
|
||||
/// Returns the shared precompile cache map.
|
||||
pub fn precompile_cache_map(&self) -> PrecompileCacheMap<SpecFor<Evm>> {
|
||||
self.precompile_cache_map.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// Entrypoint for executing the payload.
|
||||
#[derive(Debug)]
|
||||
pub struct PayloadProcessor<Evm>
|
||||
@@ -158,8 +112,8 @@ where
|
||||
{
|
||||
/// The executor used by to spawn tasks.
|
||||
executor: Runtime,
|
||||
/// Shared caches reused across payload processing.
|
||||
shared_caches: EngineSharedCaches<Evm>,
|
||||
/// The most recent cache used for execution.
|
||||
execution_cache: PayloadExecutionCache,
|
||||
/// Metrics for trie operations
|
||||
trie_metrics: MultiProofTaskMetrics,
|
||||
/// Cross-block cache size in bytes.
|
||||
@@ -172,12 +126,20 @@ where
|
||||
evm_config: Evm,
|
||||
/// Whether precompile cache should be disabled.
|
||||
precompile_cache_disabled: bool,
|
||||
/// Precompile cache map.
|
||||
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
|
||||
/// A pruned `SparseStateTrie`, kept around as a cache of already revealed trie nodes and to
|
||||
/// re-use allocated memory. Stored with the block hash it was computed for to enable trie
|
||||
/// preservation across sequential payload validations.
|
||||
sparse_state_trie: SharedPreservedSparseTrie,
|
||||
/// LFU hot-slot capacity: max storage slots retained across prune cycles.
|
||||
sparse_trie_max_hot_slots: usize,
|
||||
/// LFU hot-account capacity: max account addresses retained across prune cycles.
|
||||
sparse_trie_max_hot_accounts: usize,
|
||||
/// Whether sparse trie cache pruning is fully disabled.
|
||||
disable_sparse_trie_cache_pruning: bool,
|
||||
/// Whether to use the arena-based sparse trie implementation.
|
||||
enable_arena_sparse_trie: bool,
|
||||
/// Whether to disable cache metrics recording.
|
||||
disable_cache_metrics: bool,
|
||||
}
|
||||
@@ -197,20 +159,23 @@ where
|
||||
executor: Runtime,
|
||||
evm_config: Evm,
|
||||
config: &TreeConfig,
|
||||
shared_caches: EngineSharedCaches<Evm>,
|
||||
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
executor,
|
||||
shared_caches,
|
||||
execution_cache: Default::default(),
|
||||
trie_metrics: Default::default(),
|
||||
cross_block_cache_size: config.cross_block_cache_size(),
|
||||
disable_transaction_prewarming: config.disable_prewarming(),
|
||||
evm_config,
|
||||
disable_state_cache: config.disable_state_cache(),
|
||||
precompile_cache_disabled: config.precompile_cache_disabled(),
|
||||
precompile_cache_map,
|
||||
sparse_state_trie: SharedPreservedSparseTrie::default(),
|
||||
sparse_trie_max_hot_slots: config.sparse_trie_max_hot_slots(),
|
||||
sparse_trie_max_hot_accounts: config.sparse_trie_max_hot_accounts(),
|
||||
disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
|
||||
enable_arena_sparse_trie: config.enable_arena_sparse_trie(),
|
||||
disable_cache_metrics: config.disable_cache_metrics(),
|
||||
}
|
||||
}
|
||||
@@ -224,8 +189,8 @@ where
|
||||
debug!(target: "engine::tree::payload_processor", "Waiting for execution cache and sparse trie locks");
|
||||
|
||||
// Wait for both caches in parallel using std threads
|
||||
let execution_cache = self.shared_caches.execution_cache();
|
||||
let sparse_trie = self.shared_caches.sparse_trie_cache();
|
||||
let execution_cache = self.execution_cache.clone();
|
||||
let sparse_trie = self.sparse_state_trie.clone();
|
||||
|
||||
// Use channels and spawn_blocking instead of std::thread::spawn
|
||||
let (execution_tx, execution_rx) = std::sync::mpsc::channel();
|
||||
@@ -539,12 +504,12 @@ where
|
||||
terminate_execution: Arc::new(AtomicBool::new(false)),
|
||||
executed_tx_index: Arc::clone(&executed_tx_index),
|
||||
precompile_cache_disabled: self.precompile_cache_disabled,
|
||||
precompile_cache_map: self.shared_caches.precompile_cache_map(),
|
||||
precompile_cache_map: self.precompile_cache_map.clone(),
|
||||
};
|
||||
|
||||
let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
|
||||
self.executor.clone(),
|
||||
self.shared_caches.execution_cache(),
|
||||
self.execution_cache.clone(),
|
||||
prewarm_ctx,
|
||||
to_multi_proof,
|
||||
);
|
||||
@@ -572,7 +537,7 @@ where
|
||||
/// instance.
|
||||
#[instrument(level = "debug", target = "engine::caching", skip(self))]
|
||||
fn cache_for(&self, parent_hash: B256) -> SavedCache {
|
||||
if let Some(cache) = self.shared_caches.execution_cache().get_cache_for(parent_hash) {
|
||||
if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
|
||||
debug!("reusing execution cache");
|
||||
cache
|
||||
} else {
|
||||
@@ -597,11 +562,12 @@ where
|
||||
parent_state_root: B256,
|
||||
chunk_size: usize,
|
||||
) {
|
||||
let sparse_trie_cache = self.shared_caches.sparse_trie_cache();
|
||||
let preserved_sparse_trie = self.sparse_state_trie.clone();
|
||||
let trie_metrics = self.trie_metrics.clone();
|
||||
let max_hot_slots = self.sparse_trie_max_hot_slots;
|
||||
let max_hot_accounts = self.sparse_trie_max_hot_accounts;
|
||||
let disable_cache_pruning = self.disable_sparse_trie_cache_pruning;
|
||||
let enable_arena_sparse_trie = self.enable_arena_sparse_trie;
|
||||
let executor = self.executor.clone();
|
||||
|
||||
let parent_span = Span::current();
|
||||
@@ -611,19 +577,49 @@ where
|
||||
let _enter = debug_span!(target: "engine::tree::payload_processor", parent: parent_span, "sparse_trie_task")
|
||||
.entered();
|
||||
|
||||
// Reuse a stored SparseStateTrie if available, applying continuation logic.
|
||||
// If this payload's parent state root matches the preserved trie's anchor,
|
||||
// we can reuse the pruned trie structure. Otherwise, we clear the trie but
|
||||
// keep allocations.
|
||||
let start = Instant::now();
|
||||
let mut checkout = sparse_trie_cache.take_or_create_for(parent_state_root);
|
||||
let preserved = preserved_sparse_trie.take();
|
||||
trie_metrics
|
||||
.sparse_trie_cache_wait_duration_histogram
|
||||
.record(start.elapsed().as_secs_f64());
|
||||
checkout.set_hot_cache_capacities(max_hot_slots, max_hot_accounts);
|
||||
|
||||
let mut task = SparseTrieCacheTask::new_with_checkout(
|
||||
let mut sparse_state_trie = preserved
|
||||
.map(|preserved| preserved.into_trie_for(parent_state_root))
|
||||
.unwrap_or_else(|| {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor",
|
||||
"Creating new sparse trie - no preserved trie available"
|
||||
);
|
||||
let default_trie = if enable_arena_sparse_trie {
|
||||
RevealableSparseTrie::blind_from(
|
||||
ConfigurableSparseTrie::Arena(ArenaParallelSparseTrie::default()),
|
||||
)
|
||||
} else {
|
||||
RevealableSparseTrie::blind_from(
|
||||
ConfigurableSparseTrie::HashMap(
|
||||
ParallelSparseTrie::default().with_parallelism_thresholds(
|
||||
PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS,
|
||||
),
|
||||
),
|
||||
)
|
||||
};
|
||||
SparseStateTrie::default()
|
||||
.with_accounts_trie(default_trie.clone())
|
||||
.with_default_storage_trie(default_trie)
|
||||
.with_updates(true)
|
||||
});
|
||||
sparse_state_trie.set_hot_cache_capacities(max_hot_slots, max_hot_accounts);
|
||||
|
||||
let mut task = SparseTrieCacheTask::new_with_trie(
|
||||
&executor,
|
||||
from_multi_proof,
|
||||
proof_worker_handle,
|
||||
trie_metrics.clone(),
|
||||
checkout,
|
||||
sparse_state_trie,
|
||||
chunk_size,
|
||||
);
|
||||
|
||||
@@ -634,7 +630,7 @@ where
|
||||
// causing take() to return None and forcing it to create a new empty trie
|
||||
// instead of reusing the preserved one. Holding the guard ensures the next
|
||||
// block's take() blocks until we've stored the trie for reuse.
|
||||
let mut guard = sparse_trie_cache.lock();
|
||||
let mut guard = preserved_sparse_trie.lock();
|
||||
|
||||
let task_result = result.as_ref().ok().cloned();
|
||||
// Send state root computation result - next block may start but will block on take()
|
||||
@@ -649,7 +645,7 @@ where
|
||||
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
|
||||
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
|
||||
);
|
||||
trie.store_prepared_cleared_with_guard(&mut guard);
|
||||
guard.store(PreservedSparseTrie::cleared(trie));
|
||||
drop(guard);
|
||||
executor.spawn_drop(deferred);
|
||||
return;
|
||||
@@ -678,7 +674,7 @@ where
|
||||
trie_metrics
|
||||
.sparse_trie_retained_storage_tries
|
||||
.set(trie.retained_storage_tries_count() as f64);
|
||||
trie.store_anchored_with_guard(&mut guard, result.state_root);
|
||||
guard.store(PreservedSparseTrie::anchored(trie, result.state_root));
|
||||
deferred
|
||||
} else {
|
||||
debug!(
|
||||
@@ -689,7 +685,7 @@ where
|
||||
SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
|
||||
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
|
||||
);
|
||||
trie.store_prepared_cleared_with_guard(&mut guard);
|
||||
guard.store(PreservedSparseTrie::cleared(trie));
|
||||
deferred
|
||||
};
|
||||
drop(guard);
|
||||
@@ -710,7 +706,7 @@ where
|
||||
bundle_state: &BundleState,
|
||||
) {
|
||||
let disable_cache_metrics = self.disable_cache_metrics;
|
||||
self.shared_caches.execution_cache().update_with_guard(|cached| {
|
||||
self.execution_cache.update_with_guard(|cached| {
|
||||
if cached.as_ref().is_some_and(|c| c.executed_block_hash() != block_with_parent.parent) {
|
||||
debug!(
|
||||
target: "engine::caching",
|
||||
@@ -1014,7 +1010,7 @@ impl<R> Drop for CacheTaskHandle<R> {
|
||||
/// - Prepares data for state root proof computation
|
||||
/// - Runs concurrently but must not interfere with cache saves
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub(crate) struct PayloadExecutionCache {
|
||||
pub struct PayloadExecutionCache {
|
||||
/// Guarded cloneable cache identified by a block hash.
|
||||
inner: Arc<RwLock<Option<SavedCache>>>,
|
||||
/// Metrics for cache operations.
|
||||
@@ -1022,11 +1018,11 @@ pub(crate) struct PayloadExecutionCache {
|
||||
}
|
||||
|
||||
impl PayloadExecutionCache {
|
||||
/// Returns the cache backing store for `parent_hash` if it's available for reuse.
|
||||
/// Returns the cache for `parent_hash` if it's available for use.
|
||||
///
|
||||
/// If the tracked cache is available but keyed to a different parent hash, the cache is
|
||||
/// cleared and returned so callers can reuse the underlying allocations without carrying over
|
||||
/// stale state.
|
||||
/// A cache is considered available when:
|
||||
/// - It exists and matches the requested parent hash
|
||||
/// - No other tasks are currently using it (checked via Arc reference count)
|
||||
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip(self))]
|
||||
pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
|
||||
let start = Instant::now();
|
||||
@@ -1065,7 +1061,7 @@ impl PayloadExecutionCache {
|
||||
// and picking up polluted data if the fork block fails.
|
||||
c.clear_with_hash(parent_hash);
|
||||
}
|
||||
return Some(c.clone());
|
||||
return Some(c.clone())
|
||||
} else if hash_matches {
|
||||
self.metrics.execution_cache_in_use.increment(1);
|
||||
}
|
||||
@@ -1082,7 +1078,7 @@ impl PayloadExecutionCache {
|
||||
/// This is useful for synchronization before starting payload processing.
|
||||
///
|
||||
/// Returns the time spent waiting for the lock.
|
||||
pub(crate) fn wait_for_availability(&self) -> Duration {
|
||||
pub fn wait_for_availability(&self) -> Duration {
|
||||
let start = Instant::now();
|
||||
// Acquire write lock to wait for any current holders to finish
|
||||
let _guard = self.inner.write();
|
||||
@@ -1110,7 +1106,7 @@ impl PayloadExecutionCache {
|
||||
///
|
||||
/// Violating this requirement can result in cache corruption, incorrect state data,
|
||||
/// and potential consensus failures.
|
||||
pub(crate) fn update_with_guard<F>(&self, update_fn: F)
|
||||
pub fn update_with_guard<F>(&self, update_fn: F)
|
||||
where
|
||||
F: FnOnce(&mut Option<SavedCache>),
|
||||
{
|
||||
@@ -1179,9 +1175,8 @@ mod tests {
|
||||
use super::PayloadExecutionCache;
|
||||
use crate::tree::{
|
||||
cached_state::{CachedStateMetrics, ExecutionCache, SavedCache},
|
||||
payload_processor::{
|
||||
evm_state_to_hashed_post_state, EngineSharedCaches, ExecutionEnv, PayloadProcessor,
|
||||
},
|
||||
payload_processor::{evm_state_to_hashed_post_state, ExecutionEnv, PayloadProcessor},
|
||||
precompile_cache::PrecompileCacheMap,
|
||||
StateProviderBuilder, TreeConfig,
|
||||
};
|
||||
use alloy_eips::eip1898::{BlockNumHash, BlockWithParent};
|
||||
@@ -1293,7 +1288,7 @@ mod tests {
|
||||
reth_tasks::Runtime::test(),
|
||||
EthEvmConfig::new(Arc::new(ChainSpec::default())),
|
||||
&TreeConfig::default(),
|
||||
EngineSharedCaches::default(),
|
||||
PrecompileCacheMap::default(),
|
||||
);
|
||||
|
||||
let parent_hash = B256::from([1u8; 32]);
|
||||
@@ -1305,17 +1300,13 @@ mod tests {
|
||||
let bundle_state = BundleState::default();
|
||||
|
||||
// Cache should be empty initially
|
||||
assert!(payload_processor
|
||||
.shared_caches
|
||||
.execution_cache()
|
||||
.get_cache_for(block_hash)
|
||||
.is_none());
|
||||
assert!(payload_processor.execution_cache.get_cache_for(block_hash).is_none());
|
||||
|
||||
// Update cache with inserted block
|
||||
payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
|
||||
|
||||
// Cache should now exist for the block hash
|
||||
let cached = payload_processor.shared_caches.execution_cache().get_cache_for(block_hash);
|
||||
let cached = payload_processor.execution_cache.get_cache_for(block_hash);
|
||||
assert!(cached.is_some());
|
||||
assert_eq!(cached.unwrap().executed_block_hash(), block_hash);
|
||||
}
|
||||
@@ -1326,14 +1317,13 @@ mod tests {
|
||||
reth_tasks::Runtime::test(),
|
||||
EthEvmConfig::new(Arc::new(ChainSpec::default())),
|
||||
&TreeConfig::default(),
|
||||
EngineSharedCaches::default(),
|
||||
PrecompileCacheMap::default(),
|
||||
);
|
||||
|
||||
// Setup: populate cache with block 1
|
||||
let block1_hash = B256::from([1u8; 32]);
|
||||
payload_processor
|
||||
.shared_caches
|
||||
.execution_cache()
|
||||
.execution_cache
|
||||
.update_with_guard(|slot| *slot = Some(make_saved_cache(block1_hash)));
|
||||
|
||||
// Try to insert block 3 with wrong parent (should skip and keep block 1's cache)
|
||||
@@ -1348,11 +1338,11 @@ mod tests {
|
||||
payload_processor.on_inserted_executed_block(block_with_parent, &bundle_state);
|
||||
|
||||
// Cache should still be for block 1 (unchanged)
|
||||
let cached = payload_processor.shared_caches.execution_cache().get_cache_for(block1_hash);
|
||||
let cached = payload_processor.execution_cache.get_cache_for(block1_hash);
|
||||
assert!(cached.is_some(), "Original cache should be preserved");
|
||||
|
||||
// Cache for block 3 should not exist
|
||||
let cached3 = payload_processor.shared_caches.execution_cache().get_cache_for(block3_hash);
|
||||
let cached3 = payload_processor.execution_cache.get_cache_for(block3_hash);
|
||||
assert!(cached3.is_none(), "New block cache should not be created on mismatch");
|
||||
}
|
||||
|
||||
@@ -1462,7 +1452,7 @@ mod tests {
|
||||
reth_tasks::Runtime::test(),
|
||||
EthEvmConfig::new(factory.chain_spec()),
|
||||
&TreeConfig::default(),
|
||||
EngineSharedCaches::default(),
|
||||
PrecompileCacheMap::default(),
|
||||
);
|
||||
|
||||
let provider_factory = BlockchainProvider::new(factory).unwrap();
|
||||
|
||||
@@ -1,128 +1,44 @@
|
||||
//! Preserved sparse trie for reuse across payload validations.
|
||||
|
||||
use super::{
|
||||
PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS, SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY,
|
||||
SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY,
|
||||
};
|
||||
use alloy_primitives::B256;
|
||||
use parking_lot::Mutex;
|
||||
use reth_trie_sparse::{
|
||||
ArenaParallelSparseTrie, ConfigurableSparseTrie, ParallelSparseTrie, RevealableSparseTrie,
|
||||
SparseStateTrie,
|
||||
};
|
||||
use std::{
|
||||
ops::{Deref, DerefMut},
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use reth_trie_sparse::{ConfigurableSparseTrie, SparseStateTrie};
|
||||
use std::{sync::Arc, time::Instant};
|
||||
use tracing::debug;
|
||||
|
||||
/// Type alias for the sparse trie type used in preservation.
|
||||
type SparseTrie = SparseStateTrie<ConfigurableSparseTrie, ConfigurableSparseTrie>;
|
||||
pub(super) type SparseTrie = SparseStateTrie<ConfigurableSparseTrie, ConfigurableSparseTrie>;
|
||||
|
||||
/// Sparse trie implementation used by [`PayloadSparseTrieCache`].
|
||||
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum PayloadSparseTrieKind {
|
||||
/// Back sparse trie storage with hash maps.
|
||||
#[default]
|
||||
HashMap,
|
||||
/// Back sparse trie storage with arena allocations.
|
||||
Arena,
|
||||
}
|
||||
|
||||
impl From<bool> for PayloadSparseTrieKind {
|
||||
fn from(enable_arena_sparse_trie: bool) -> Self {
|
||||
if enable_arena_sparse_trie {
|
||||
Self::Arena
|
||||
} else {
|
||||
Self::HashMap
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct PayloadSparseTrieState {
|
||||
latest_checkout_id: u64,
|
||||
preserved: Option<PreservedSparseTrie>,
|
||||
}
|
||||
|
||||
/// Outcome of storing a checked-out sparse trie back into the shared cache.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum PayloadSparseTrieStoreOutcome {
|
||||
/// The checkout was the most recent lease and the trie was stored.
|
||||
Stored,
|
||||
/// A newer checkout had already been issued, so this stale lease was ignored.
|
||||
IgnoredStaleCheckout,
|
||||
}
|
||||
|
||||
/// Shared sparse trie cache that can be reused across payload validations.
|
||||
/// Shared handle to a preserved sparse trie that can be reused across payload validations.
|
||||
///
|
||||
/// This is the public sparse-trie SDK surface exposed through
|
||||
/// [`EngineSharedCaches`](super::EngineSharedCaches). Callers take or create a trie, use it for
|
||||
/// payload work, then store it back either anchored to the resulting state root or cleared for
|
||||
/// allocation reuse.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PayloadSparseTrieCache {
|
||||
kind: PayloadSparseTrieKind,
|
||||
state: Arc<Mutex<PayloadSparseTrieState>>,
|
||||
}
|
||||
/// This is stored in [`PayloadProcessor`](super::PayloadProcessor) and cloned to pass to
|
||||
/// [`SparseTrieCacheTask`](super::sparse_trie::SparseTrieCacheTask) for trie reuse.
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub(super) struct SharedPreservedSparseTrie(Arc<Mutex<Option<PreservedSparseTrie>>>);
|
||||
|
||||
impl Default for PayloadSparseTrieCache {
|
||||
fn default() -> Self {
|
||||
Self::new(PayloadSparseTrieKind::default())
|
||||
}
|
||||
}
|
||||
|
||||
impl PayloadSparseTrieCache {
|
||||
/// Creates a sparse trie cache backed by the requested trie implementation.
|
||||
pub fn new(kind: PayloadSparseTrieKind) -> Self {
|
||||
Self { kind, state: Arc::new(Mutex::new(PayloadSparseTrieState::default())) }
|
||||
impl SharedPreservedSparseTrie {
|
||||
/// Takes the preserved trie if present, leaving `None` in its place.
|
||||
pub(super) fn take(&self) -> Option<PreservedSparseTrie> {
|
||||
self.0.lock().take()
|
||||
}
|
||||
|
||||
/// Returns the sparse trie implementation used when the cache needs to create a new trie.
|
||||
pub const fn kind(&self) -> PayloadSparseTrieKind {
|
||||
self.kind
|
||||
}
|
||||
|
||||
/// Takes a preserved trie for `parent_state_root` or creates a new trie if the cache is empty.
|
||||
pub fn take_or_create_for(&self, parent_state_root: B256) -> SparseTrieCheckout {
|
||||
let start = Instant::now();
|
||||
let mut state = self.state.lock();
|
||||
state.latest_checkout_id += 1;
|
||||
let checkout_id = state.latest_checkout_id;
|
||||
let trie = state
|
||||
.preserved
|
||||
.take()
|
||||
.map(|preserved| preserved.into_trie_for(parent_state_root))
|
||||
.unwrap_or_else(|| {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor",
|
||||
%parent_state_root,
|
||||
kind = ?self.kind,
|
||||
"Creating new sparse trie - no preserved trie available"
|
||||
);
|
||||
new_sparse_trie(self.kind)
|
||||
});
|
||||
drop(state);
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
if elapsed.as_millis() > 5 {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor",
|
||||
blocked_for=?elapsed,
|
||||
"Waited for preserved sparse trie checkout"
|
||||
);
|
||||
}
|
||||
|
||||
SparseTrieCheckout { trie: Some(trie), cache: self.clone(), checkout_id }
|
||||
/// Acquires a guard that blocks `take()` until dropped.
|
||||
/// Use this before sending the state root result to ensure the next block
|
||||
/// waits for the trie to be stored.
|
||||
pub(super) fn lock(&self) -> PreservedTrieGuard<'_> {
|
||||
PreservedTrieGuard(self.0.lock())
|
||||
}
|
||||
|
||||
/// Waits until the sparse trie lock becomes available.
|
||||
///
|
||||
/// This acquires and immediately releases the lock, ensuring that any
|
||||
/// ongoing operations complete before returning. Useful for synchronization
|
||||
/// before starting payload processing.
|
||||
///
|
||||
/// Returns the time spent waiting for the lock.
|
||||
pub fn wait_for_availability(&self) -> Duration {
|
||||
pub(super) fn wait_for_availability(&self) -> std::time::Duration {
|
||||
let start = Instant::now();
|
||||
let _guard = self.state.lock();
|
||||
let _guard = self.0.lock();
|
||||
let elapsed = start.elapsed();
|
||||
if elapsed.as_millis() > 5 {
|
||||
debug!(
|
||||
@@ -133,142 +49,27 @@ impl PayloadSparseTrieCache {
|
||||
}
|
||||
elapsed
|
||||
}
|
||||
|
||||
/// Acquires a guard that blocks cache mutation until dropped.
|
||||
///
|
||||
/// Engine-internal code uses this before making the state-root result visible so the next
|
||||
/// payload cannot observe an empty cache between send and store.
|
||||
pub(super) fn lock(&self) -> PreservedTrieGuard<'_> {
|
||||
PreservedTrieGuard { state: self.state.lock() }
|
||||
}
|
||||
}
|
||||
|
||||
/// A checked-out sparse trie lease.
|
||||
///
|
||||
/// This dereferences to [`SparseStateTrie`] so callers can reuse the trie directly. If the lease is
|
||||
/// dropped without being stored back, a cleared trie is returned to the shared cache unless a newer
|
||||
/// checkout has already superseded it.
|
||||
#[derive(Debug)]
|
||||
pub struct SparseTrieCheckout {
|
||||
trie: Option<SparseTrie>,
|
||||
cache: PayloadSparseTrieCache,
|
||||
checkout_id: u64,
|
||||
}
|
||||
|
||||
impl SparseTrieCheckout {
|
||||
/// Stores the trie back into the shared cache anchored to the given state root.
|
||||
pub fn store_anchored(self, state_root: B256) -> PayloadSparseTrieStoreOutcome {
|
||||
let cache = self.cache.clone();
|
||||
let mut guard = cache.lock();
|
||||
self.store_anchored_with_guard(&mut guard, state_root)
|
||||
}
|
||||
|
||||
/// Stores the trie back into the shared cache in a cleared state.
|
||||
pub fn store_cleared(mut self) -> PayloadSparseTrieStoreOutcome {
|
||||
let cache = self.cache.clone();
|
||||
let mut trie = self.take_trie();
|
||||
prepare_cleared_trie(&mut trie);
|
||||
let deferred = trie.take_deferred_drops();
|
||||
let mut guard = cache.lock();
|
||||
let outcome = guard.store(self.checkout_id, PreservedSparseTrie::cleared(trie));
|
||||
drop(guard);
|
||||
drop(deferred);
|
||||
outcome
|
||||
}
|
||||
|
||||
/// Stores the trie back into the shared cache anchored to the given state root while the
|
||||
/// caller is already holding the preservation lock.
|
||||
pub(super) fn store_anchored_with_guard(
|
||||
mut self,
|
||||
guard: &mut PreservedTrieGuard<'_>,
|
||||
state_root: B256,
|
||||
) -> PayloadSparseTrieStoreOutcome {
|
||||
guard.store(self.checkout_id, PreservedSparseTrie::anchored(self.take_trie(), state_root))
|
||||
}
|
||||
|
||||
/// Stores an already-cleared trie back into the shared cache while the caller is already
|
||||
/// holding the preservation lock.
|
||||
pub(super) fn store_prepared_cleared_with_guard(
|
||||
mut self,
|
||||
guard: &mut PreservedTrieGuard<'_>,
|
||||
) -> PayloadSparseTrieStoreOutcome {
|
||||
guard.store(self.checkout_id, PreservedSparseTrie::cleared(self.take_trie()))
|
||||
}
|
||||
|
||||
fn take_trie(&mut self) -> SparseTrie {
|
||||
self.trie.take().expect("sparse trie checkout must hold a trie until it is stored")
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for SparseTrieCheckout {
|
||||
type Target = SparseTrie;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.trie.as_ref().expect("sparse trie checkout must hold a trie until it is stored")
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for SparseTrieCheckout {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
self.trie.as_mut().expect("sparse trie checkout must hold a trie until it is stored")
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for SparseTrieCheckout {
|
||||
fn drop(&mut self) {
|
||||
let Some(mut trie) = self.trie.take() else { return };
|
||||
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor",
|
||||
checkout_id = self.checkout_id,
|
||||
"Sparse trie checkout dropped before store, returning cleared trie to cache"
|
||||
);
|
||||
|
||||
prepare_cleared_trie(&mut trie);
|
||||
let deferred = trie.take_deferred_drops();
|
||||
let mut guard = self.cache.lock();
|
||||
let _ = guard.store(self.checkout_id, PreservedSparseTrie::cleared(trie));
|
||||
drop(guard);
|
||||
drop(deferred);
|
||||
}
|
||||
}
|
||||
|
||||
/// Guard that holds the lock on the preserved trie.
|
||||
/// While held, take-or-create calls will block. Call `store()` to save the trie before dropping.
|
||||
pub(super) struct PreservedTrieGuard<'a> {
|
||||
state: parking_lot::MutexGuard<'a, PayloadSparseTrieState>,
|
||||
}
|
||||
/// While held, `take()` will block. Call `store()` to save the trie before dropping.
|
||||
pub(super) struct PreservedTrieGuard<'a>(parking_lot::MutexGuard<'a, Option<PreservedSparseTrie>>);
|
||||
|
||||
impl PreservedTrieGuard<'_> {
|
||||
/// Stores a preserved trie for later reuse if the checkout is still current.
|
||||
fn store(
|
||||
&mut self,
|
||||
checkout_id: u64,
|
||||
trie: PreservedSparseTrie,
|
||||
) -> PayloadSparseTrieStoreOutcome {
|
||||
if checkout_id != self.state.latest_checkout_id {
|
||||
debug!(
|
||||
target: "engine::tree::payload_processor",
|
||||
checkout_id,
|
||||
latest_checkout_id = self.state.latest_checkout_id,
|
||||
"Ignoring stale sparse trie checkout"
|
||||
);
|
||||
return PayloadSparseTrieStoreOutcome::IgnoredStaleCheckout;
|
||||
}
|
||||
|
||||
self.state.preserved.replace(trie);
|
||||
PayloadSparseTrieStoreOutcome::Stored
|
||||
/// Stores a preserved trie for later reuse.
|
||||
pub(super) fn store(&mut self, trie: PreservedSparseTrie) {
|
||||
self.0.replace(trie);
|
||||
}
|
||||
}
|
||||
|
||||
/// A preserved sparse trie that can be reused across payload validations.
|
||||
///
|
||||
/// The trie exists in one of two states:
|
||||
/// - **Anchored**: Has a computed state root and can be reused for payloads whose parent state
|
||||
/// root matches the anchor.
|
||||
/// - **Anchored**: Has a computed state root and can be reused for payloads whose parent state root
|
||||
/// matches the anchor.
|
||||
/// - **Cleared**: Trie data has been cleared but allocations are preserved for reuse.
|
||||
#[derive(Debug)]
|
||||
enum PreservedSparseTrie {
|
||||
pub(super) enum PreservedSparseTrie {
|
||||
/// Trie with a computed state root that can be reused for continuation payloads.
|
||||
Anchored {
|
||||
/// The sparse state trie (pruned after root computation).
|
||||
@@ -286,17 +87,24 @@ enum PreservedSparseTrie {
|
||||
|
||||
impl PreservedSparseTrie {
|
||||
/// Creates a new anchored preserved trie.
|
||||
const fn anchored(trie: SparseTrie, state_root: B256) -> Self {
|
||||
///
|
||||
/// The `state_root` is the computed state root from the trie, which becomes the
|
||||
/// anchor for determining if subsequent payloads can reuse this trie.
|
||||
pub(super) const fn anchored(trie: SparseTrie, state_root: B256) -> Self {
|
||||
Self::Anchored { trie, state_root }
|
||||
}
|
||||
|
||||
/// Creates a cleared preserved trie (allocations preserved, data cleared).
|
||||
const fn cleared(trie: SparseTrie) -> Self {
|
||||
pub(super) const fn cleared(trie: SparseTrie) -> Self {
|
||||
Self::Cleared { trie }
|
||||
}
|
||||
|
||||
/// Consumes self and returns the trie for reuse.
|
||||
fn into_trie_for(self, parent_state_root: B256) -> SparseTrie {
|
||||
///
|
||||
/// If the preserved trie is anchored and the parent state root matches, the pruned
|
||||
/// trie structure is reused directly. Otherwise, the trie is cleared but allocations
|
||||
/// are preserved to reduce memory overhead.
|
||||
pub(super) fn into_trie_for(self, parent_state_root: B256) -> SparseTrie {
|
||||
match self {
|
||||
Self::Anchored { trie, state_root } if state_root == parent_state_root => {
|
||||
debug!(
|
||||
@@ -327,111 +135,3 @@ impl PreservedSparseTrie {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn new_sparse_trie(kind: PayloadSparseTrieKind) -> SparseTrie {
|
||||
let default_trie = match kind {
|
||||
PayloadSparseTrieKind::HashMap => {
|
||||
RevealableSparseTrie::blind_from(ConfigurableSparseTrie::HashMap(
|
||||
ParallelSparseTrie::default()
|
||||
.with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
|
||||
))
|
||||
}
|
||||
PayloadSparseTrieKind::Arena => RevealableSparseTrie::blind_from(
|
||||
ConfigurableSparseTrie::Arena(ArenaParallelSparseTrie::default()),
|
||||
),
|
||||
};
|
||||
|
||||
SparseStateTrie::default()
|
||||
.with_accounts_trie(default_trie.clone())
|
||||
.with_default_storage_trie(default_trie)
|
||||
.with_updates(true)
|
||||
}
|
||||
|
||||
fn prepare_cleared_trie(trie: &mut SparseTrie) {
|
||||
trie.clear();
|
||||
trie.shrink_to(SPARSE_TRIE_MAX_NODES_SHRINK_CAPACITY, SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn take_or_create_reuses_matching_anchor() {
|
||||
let cache = PayloadSparseTrieCache::default();
|
||||
let state_root = B256::with_last_byte(1);
|
||||
|
||||
assert_eq!(
|
||||
cache.take_or_create_for(state_root).store_anchored(state_root),
|
||||
PayloadSparseTrieStoreOutcome::Stored
|
||||
);
|
||||
|
||||
match cache.state.lock().preserved.as_ref() {
|
||||
Some(PreservedSparseTrie::Anchored { state_root: anchored, .. }) => {
|
||||
assert_eq!(*anchored, state_root);
|
||||
}
|
||||
other => panic!("expected anchored trie, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn drop_restores_cleared_trie() {
|
||||
let cache = PayloadSparseTrieCache::default();
|
||||
let state_root = B256::with_last_byte(2);
|
||||
|
||||
let mut checkout = cache.take_or_create_for(state_root);
|
||||
checkout.set_updates(true);
|
||||
drop(checkout);
|
||||
|
||||
match cache.state.lock().preserved.as_ref() {
|
||||
Some(PreservedSparseTrie::Cleared { .. }) => {}
|
||||
other => panic!("expected cleared trie, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stale_checkout_does_not_overwrite_newer_store() {
|
||||
let cache = PayloadSparseTrieCache::default();
|
||||
let parent_state_root = B256::with_last_byte(3);
|
||||
let anchored_state_root = B256::with_last_byte(4);
|
||||
|
||||
let stale = cache.take_or_create_for(parent_state_root);
|
||||
let fresh = cache.take_or_create_for(parent_state_root);
|
||||
|
||||
assert_eq!(
|
||||
fresh.store_anchored(anchored_state_root),
|
||||
PayloadSparseTrieStoreOutcome::Stored
|
||||
);
|
||||
assert_eq!(stale.store_cleared(), PayloadSparseTrieStoreOutcome::IgnoredStaleCheckout);
|
||||
|
||||
match cache.state.lock().preserved.as_ref() {
|
||||
Some(PreservedSparseTrie::Anchored { state_root, .. }) => {
|
||||
assert_eq!(*state_root, anchored_state_root);
|
||||
}
|
||||
other => panic!("expected anchored trie to survive stale checkout, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stale_checkout_drop_does_not_overwrite_newer_store() {
|
||||
let cache = PayloadSparseTrieCache::default();
|
||||
let parent_state_root = B256::with_last_byte(5);
|
||||
let anchored_state_root = B256::with_last_byte(6);
|
||||
|
||||
let stale = cache.take_or_create_for(parent_state_root);
|
||||
let fresh = cache.take_or_create_for(parent_state_root);
|
||||
|
||||
assert_eq!(
|
||||
fresh.store_anchored(anchored_state_root),
|
||||
PayloadSparseTrieStoreOutcome::Stored
|
||||
);
|
||||
drop(stale);
|
||||
|
||||
match cache.state.lock().preserved.as_ref() {
|
||||
Some(PreservedSparseTrie::Anchored { state_root, .. }) => {
|
||||
assert_eq!(*state_root, anchored_state_root);
|
||||
}
|
||||
other => panic!("expected anchored trie to survive stale checkout drop, got {other:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -84,7 +84,7 @@ where
|
||||
Evm: ConfigureEvm<Primitives = N> + 'static,
|
||||
{
|
||||
/// Initializes the task with the given transactions pending execution
|
||||
pub(crate) fn new(
|
||||
pub fn new(
|
||||
executor: Runtime,
|
||||
execution_cache: PayloadExecutionCache,
|
||||
ctx: PrewarmContext<N, P, Evm>,
|
||||
|
||||
@@ -7,7 +7,7 @@ use crate::tree::{
|
||||
dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage,
|
||||
DEFAULT_MAX_TARGETS_FOR_CHUNKING,
|
||||
},
|
||||
payload_processor::{multiproof::MultiProofTaskMetrics, SparseTrieCheckout},
|
||||
payload_processor::multiproof::MultiProofTaskMetrics,
|
||||
};
|
||||
use alloy_primitives::B256;
|
||||
use alloy_rlp::{Decodable, Encodable};
|
||||
@@ -30,7 +30,7 @@ use reth_trie_parallel::{
|
||||
use reth_trie_sparse::debug_recorder::TrieDebugRecorder;
|
||||
use reth_trie_sparse::{
|
||||
errors::SparseTrieResult, ConfigurableSparseTrie, DeferredDrops, LeafUpdate,
|
||||
RevealableSparseTrie,
|
||||
RevealableSparseTrie, SparseStateTrie, SparseTrie,
|
||||
};
|
||||
use revm_primitives::{hash_map::Entry, B256Map};
|
||||
use tracing::{debug, debug_span, error, instrument, trace_span};
|
||||
@@ -39,7 +39,7 @@ use tracing::{debug, debug_span, error, instrument, trace_span};
|
||||
const MAX_PENDING_UPDATES: usize = 100;
|
||||
|
||||
/// Sparse trie task implementation that uses in-memory sparse trie data to schedule proof fetching.
|
||||
pub(super) struct SparseTrieCacheTask {
|
||||
pub(super) struct SparseTrieCacheTask<A = ConfigurableSparseTrie, S = ConfigurableSparseTrie> {
|
||||
/// Sender for proof results.
|
||||
proof_result_tx: CrossbeamSender<ProofResultMessage>,
|
||||
/// Receiver for proof results directly from workers.
|
||||
@@ -47,7 +47,7 @@ pub(super) struct SparseTrieCacheTask {
|
||||
/// Receives updates from execution and prewarming.
|
||||
updates: CrossbeamReceiver<SparseTrieTaskMessage>,
|
||||
/// `SparseStateTrie` used for computing the state root.
|
||||
trie: SparseTrieCheckout,
|
||||
trie: SparseStateTrie<A, S>,
|
||||
/// Handle to the proof worker pools (storage and account).
|
||||
proof_worker_handle: ProofWorkerHandle,
|
||||
|
||||
@@ -110,14 +110,18 @@ pub(super) struct SparseTrieCacheTask {
|
||||
metrics: MultiProofTaskMetrics,
|
||||
}
|
||||
|
||||
impl SparseTrieCacheTask {
|
||||
impl<A, S> SparseTrieCacheTask<A, S>
|
||||
where
|
||||
A: SparseTrie + Default,
|
||||
S: SparseTrie + Default + Clone,
|
||||
{
|
||||
/// Creates a new sparse trie, pre-populating with an existing [`SparseStateTrie`].
|
||||
pub(super) fn new_with_checkout(
|
||||
pub(super) fn new_with_trie(
|
||||
executor: &Runtime,
|
||||
updates: CrossbeamReceiver<MultiProofMessage>,
|
||||
proof_worker_handle: ProofWorkerHandle,
|
||||
metrics: MultiProofTaskMetrics,
|
||||
trie: SparseTrieCheckout,
|
||||
trie: SparseStateTrie<A, S>,
|
||||
chunk_size: usize,
|
||||
) -> Self {
|
||||
let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
|
||||
@@ -201,7 +205,7 @@ impl SparseTrieCacheTask {
|
||||
max_values_capacity: usize,
|
||||
disable_pruning: bool,
|
||||
updates: &TrieUpdates,
|
||||
) -> (SparseTrieCheckout, DeferredDrops) {
|
||||
) -> (SparseStateTrie<A, S>, DeferredDrops) {
|
||||
let Self { mut trie, .. } = self;
|
||||
trie.commit_updates(updates);
|
||||
if !disable_pruning {
|
||||
@@ -220,7 +224,7 @@ impl SparseTrieCacheTask {
|
||||
self,
|
||||
max_nodes_capacity: usize,
|
||||
max_values_capacity: usize,
|
||||
) -> (SparseTrieCheckout, DeferredDrops) {
|
||||
) -> (SparseStateTrie<A, S>, DeferredDrops) {
|
||||
let Self { mut trie, .. } = self;
|
||||
trie.clear();
|
||||
trie.shrink_to(max_nodes_capacity, max_values_capacity);
|
||||
@@ -302,9 +306,9 @@ impl SparseTrieCacheTask {
|
||||
self.promote_pending_account_updates()?;
|
||||
self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
|
||||
|
||||
if self.finished_state_updates
|
||||
&& self.account_updates.is_empty()
|
||||
&& self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
|
||||
if self.finished_state_updates &&
|
||||
self.account_updates.is_empty() &&
|
||||
self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
|
||||
{
|
||||
break;
|
||||
}
|
||||
@@ -596,6 +600,7 @@ impl SparseTrieCacheTask {
|
||||
|
||||
Ok(updates_len_after < updates_len_before)
|
||||
}
|
||||
|
||||
/// Computes storage roots for accounts whose storage updates are fully drained.
|
||||
///
|
||||
/// For each storage trie T that:
|
||||
@@ -616,16 +621,16 @@ impl SparseTrieCacheTask {
|
||||
.filter_map(|(address, updates)| updates.is_empty().then_some(*address))
|
||||
.collect();
|
||||
|
||||
struct SendStorageTriePtr(*mut RevealableSparseTrie<ConfigurableSparseTrie>);
|
||||
struct SendStorageTriePtr<S>(*mut RevealableSparseTrie<S>);
|
||||
// SAFETY: this wrapper only forwards the pointer across rayon; deref invariants are
|
||||
// documented at the use site below.
|
||||
unsafe impl Send for SendStorageTriePtr {}
|
||||
unsafe impl<S: Send> Send for SendStorageTriePtr<S> {}
|
||||
|
||||
let mut tries_to_compute_roots: Vec<(B256, SendStorageTriePtr)> =
|
||||
let mut tries_to_compute_roots: Vec<(B256, SendStorageTriePtr<S>)> =
|
||||
Vec::with_capacity(addresses_to_compute_roots.len());
|
||||
for address in addresses_to_compute_roots {
|
||||
if let Some(trie) = self.trie.storage_tries_mut().get_mut(&address)
|
||||
&& !trie.is_root_cached()
|
||||
if let Some(trie) = self.trie.storage_tries_mut().get_mut(&address) &&
|
||||
!trie.is_root_cached()
|
||||
{
|
||||
tries_to_compute_roots.push((address, SendStorageTriePtr(trie)));
|
||||
}
|
||||
@@ -724,7 +729,7 @@ impl SparseTrieCacheTask {
|
||||
// We need to keep iterating if any updates are being drained because that might
|
||||
// indicate that more pending account updates can be promoted.
|
||||
if num_promoted == 0 || !self.process_account_leaf_updates(false)? {
|
||||
break;
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
@@ -845,6 +850,7 @@ pub struct StateRootComputeOutcome {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use alloy_primitives::{keccak256, Address, B256, U256};
|
||||
use reth_trie_sparse::ArenaParallelSparseTrie;
|
||||
|
||||
#[test]
|
||||
fn test_run_hashing_task_hashed_state_update_forwards() {
|
||||
@@ -867,7 +873,10 @@ mod tests {
|
||||
let expected_state = hashed_state.clone();
|
||||
|
||||
let handle = std::thread::spawn(move || {
|
||||
SparseTrieCacheTask::run_hashing_task(updates_rx, hashed_state_tx);
|
||||
SparseTrieCacheTask::<ArenaParallelSparseTrie, ArenaParallelSparseTrie>::run_hashing_task(
|
||||
updates_rx,
|
||||
hashed_state_tx,
|
||||
);
|
||||
});
|
||||
|
||||
updates_tx.send(MultiProofMessage::HashedStateUpdate(hashed_state)).unwrap();
|
||||
|
||||
@@ -4,7 +4,7 @@ use crate::tree::{
|
||||
cached_state::{CacheStats, CachedStateProvider},
|
||||
error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError},
|
||||
instrumented_state::{InstrumentedStateProvider, StateProviderStats},
|
||||
payload_processor::{EngineSharedCaches, PayloadProcessor},
|
||||
payload_processor::PayloadProcessor,
|
||||
precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap},
|
||||
sparse_trie::StateRootComputeOutcome,
|
||||
CacheWaitDurations, EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle,
|
||||
@@ -190,13 +190,16 @@ where
|
||||
validator: V,
|
||||
config: TreeConfig,
|
||||
invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
|
||||
shared_caches: EngineSharedCaches<Evm>,
|
||||
changeset_cache: ChangesetCache,
|
||||
runtime: reth_tasks::Runtime,
|
||||
) -> Self {
|
||||
let precompile_cache_map = shared_caches.precompile_cache_map();
|
||||
let payload_processor =
|
||||
PayloadProcessor::new(runtime.clone(), evm_config.clone(), &config, shared_caches);
|
||||
let precompile_cache_map = PrecompileCacheMap::default();
|
||||
let payload_processor = PayloadProcessor::new(
|
||||
runtime.clone(),
|
||||
evm_config.clone(),
|
||||
&config,
|
||||
precompile_cache_map.clone(),
|
||||
);
|
||||
Self {
|
||||
provider,
|
||||
consensus,
|
||||
@@ -310,7 +313,7 @@ where
|
||||
// Validate block consensus rules which includes header validation
|
||||
if let Err(consensus_err) = self.validate_block_inner(&block, None) {
|
||||
// Header validation error takes precedence over execution error
|
||||
return Err(InsertBlockError::new(block, consensus_err.into()).into());
|
||||
return Err(InsertBlockError::new(block, consensus_err.into()).into())
|
||||
}
|
||||
|
||||
// Also validate against the parent
|
||||
@@ -318,7 +321,7 @@ where
|
||||
self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
|
||||
{
|
||||
// Parent validation error takes precedence over execution error
|
||||
return Err(InsertBlockError::new(block, consensus_err.into()).into());
|
||||
return Err(InsertBlockError::new(block, consensus_err.into()).into())
|
||||
}
|
||||
|
||||
// No header validation errors, return the original execution error
|
||||
@@ -393,7 +396,7 @@ where
|
||||
Ok(val) => val,
|
||||
Err(e) => {
|
||||
let block = convert_to_block(input)?;
|
||||
return Err(InsertBlockError::new(block, e.into()).into());
|
||||
return Err(InsertBlockError::new(block, e.into()).into())
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -426,7 +429,7 @@ where
|
||||
convert_to_block(input)?,
|
||||
ProviderError::HeaderNotFound(parent_hash.into()).into(),
|
||||
)
|
||||
.into());
|
||||
.into())
|
||||
};
|
||||
let mut state_provider = ensure_ok!(provider_builder.build());
|
||||
drop(_enter);
|
||||
@@ -439,7 +442,7 @@ where
|
||||
convert_to_block(input)?,
|
||||
ProviderError::HeaderNotFound(parent_hash.into()).into(),
|
||||
)
|
||||
.into());
|
||||
.into())
|
||||
};
|
||||
|
||||
let evm_env = debug_span!(target: "engine::tree::payload_validator", "evm_env")
|
||||
@@ -759,7 +762,7 @@ where
|
||||
)
|
||||
.into(),
|
||||
)
|
||||
.into());
|
||||
.into())
|
||||
}
|
||||
|
||||
let timing_stats = state_provider_stats.map(|stats| {
|
||||
@@ -821,14 +824,14 @@ where
|
||||
) -> Result<(), ConsensusError> {
|
||||
if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
|
||||
error!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {}: {e}", block.hash());
|
||||
return Err(e);
|
||||
return Err(e)
|
||||
}
|
||||
|
||||
if let Err(e) =
|
||||
self.consensus.validate_block_pre_execution_with_tx_root(block, transaction_root)
|
||||
{
|
||||
error!(target: "engine::tree::payload_validator", ?block, "Failed to validate block {}: {e}", block.hash());
|
||||
return Err(e);
|
||||
return Err(e)
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -1320,7 +1323,7 @@ where
|
||||
trace!(target: "engine::tree::payload_validator", block=?block.num_hash(), "Validating block consensus");
|
||||
// validate block consensus rules
|
||||
if let Err(e) = self.validate_block_inner(block, transaction_root) {
|
||||
return Err(e.into());
|
||||
return Err(e.into())
|
||||
}
|
||||
|
||||
// now validate against the parent
|
||||
@@ -1329,7 +1332,7 @@ where
|
||||
self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
|
||||
{
|
||||
warn!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {} against parent: {e}", block.hash());
|
||||
return Err(e.into());
|
||||
return Err(e.into())
|
||||
}
|
||||
drop(_enter);
|
||||
|
||||
@@ -1342,7 +1345,7 @@ where
|
||||
{
|
||||
// call post-block hook
|
||||
self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
|
||||
return Err(err.into());
|
||||
return Err(err.into())
|
||||
}
|
||||
drop(_enter);
|
||||
|
||||
@@ -1358,7 +1361,7 @@ where
|
||||
{
|
||||
// call post-block hook
|
||||
self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
|
||||
return Err(err.into());
|
||||
return Err(err.into())
|
||||
}
|
||||
|
||||
// record post-execution validation duration
|
||||
@@ -1466,7 +1469,7 @@ where
|
||||
self.provider.clone(),
|
||||
historical,
|
||||
Some(blocks),
|
||||
)));
|
||||
)))
|
||||
}
|
||||
|
||||
// Check if the block is persisted
|
||||
@@ -1474,7 +1477,7 @@ where
|
||||
debug!(target: "engine::tree::payload_validator", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
|
||||
// For persisted blocks, we create a builder that will fetch state directly from the
|
||||
// database
|
||||
return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)));
|
||||
return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
|
||||
}
|
||||
|
||||
debug!(target: "engine::tree::payload_validator", %hash, "no canonical state found for block");
|
||||
@@ -1506,7 +1509,7 @@ where
|
||||
) {
|
||||
if state.invalid_headers.get(&block.hash()).is_some() {
|
||||
// we already marked this block as invalid
|
||||
return;
|
||||
return
|
||||
}
|
||||
self.invalid_block_hook.on_invalid_block(parent_header, block, output, trie_updates);
|
||||
}
|
||||
|
||||
@@ -203,7 +203,6 @@ impl TestHarness {
|
||||
payload_validator,
|
||||
TreeConfig::default(),
|
||||
Box::new(NoopInvalidBlockHook::default()),
|
||||
EngineSharedCaches::default(),
|
||||
changeset_cache.clone(),
|
||||
reth_tasks::Runtime::test(),
|
||||
);
|
||||
@@ -408,7 +407,6 @@ impl ValidatorTestHarness {
|
||||
payload_validator,
|
||||
TreeConfig::default(),
|
||||
Box::new(NoopInvalidBlockHook::default()),
|
||||
EngineSharedCaches::default(),
|
||||
changeset_cache,
|
||||
reth_tasks::Runtime::test(),
|
||||
);
|
||||
|
||||
@@ -1,27 +0,0 @@
|
||||
//! SDK smoke tests for `EngineSharedCaches`.
|
||||
|
||||
use alloy_primitives::B256;
|
||||
use reth_engine_tree::tree::{
|
||||
EngineSharedCaches, PayloadSparseTrieKind, PayloadSparseTrieStoreOutcome,
|
||||
};
|
||||
use reth_evm_ethereum::EthEvmConfig;
|
||||
|
||||
#[test]
|
||||
fn engine_shared_caches_exposes_public_sparse_trie_sdk() {
|
||||
let caches =
|
||||
EngineSharedCaches::<EthEvmConfig>::with_sparse_trie_kind(PayloadSparseTrieKind::Arena);
|
||||
|
||||
let _precompile_cache_map = caches.precompile_cache_map();
|
||||
|
||||
let sparse_trie_cache = caches.sparse_trie_cache();
|
||||
assert_eq!(sparse_trie_cache.kind(), PayloadSparseTrieKind::Arena);
|
||||
let state_root = B256::with_last_byte(1);
|
||||
|
||||
assert_eq!(
|
||||
sparse_trie_cache.take_or_create_for(state_root).store_anchored(state_root),
|
||||
PayloadSparseTrieStoreOutcome::Stored
|
||||
);
|
||||
|
||||
let checkout = sparse_trie_cache.take_or_create_for(state_root);
|
||||
assert!(checkout.memory_size() > 0 || checkout.retained_storage_tries_count() == 0);
|
||||
}
|
||||
@@ -36,6 +36,8 @@ tracing.workspace = true
|
||||
tempfile.workspace = true
|
||||
|
||||
[features]
|
||||
default = []
|
||||
|
||||
otlp = ["reth-tracing/otlp", "reth-node-core/otlp"]
|
||||
otlp-logs = ["reth-tracing/otlp-logs", "reth-node-core/otlp-logs"]
|
||||
|
||||
@@ -87,3 +89,6 @@ min-trace-logs = [
|
||||
"tracing/release_max_level_trace",
|
||||
"reth-node-core/min-trace-logs",
|
||||
]
|
||||
|
||||
rocksdb = ["reth-cli-commands/rocksdb"]
|
||||
edge = ["rocksdb"]
|
||||
|
||||
@@ -85,7 +85,4 @@ serde = [
|
||||
"alloy-rpc-types-eth?/serde",
|
||||
"rand/serde",
|
||||
]
|
||||
rpc = [
|
||||
"dep:alloy-rpc-types-eth",
|
||||
"alloy-rpc-types-eth?/serde",
|
||||
]
|
||||
rpc = ["dep:alloy-rpc-types-eth"]
|
||||
|
||||
@@ -66,6 +66,8 @@ secp256k1.workspace = true
|
||||
tempfile.workspace = true
|
||||
|
||||
[features]
|
||||
default = []
|
||||
edge = ["reth-provider/edge"]
|
||||
serde = [
|
||||
"reth-exex-types/serde",
|
||||
"reth-revm/serde",
|
||||
|
||||
@@ -63,28 +63,6 @@ impl EthStreamError {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns whether this error indicates a protocol breach on the receive side.
|
||||
///
|
||||
/// These are errors caused by the remote peer sending invalid or malformed data
|
||||
/// that warrant disconnecting with [`DisconnectReason::ProtocolBreach`].
|
||||
pub const fn is_protocol_breach(&self) -> bool {
|
||||
matches!(
|
||||
self,
|
||||
Self::InvalidMessage(_) |
|
||||
Self::MessageTooBig(_) |
|
||||
Self::TransactionHashesInvalidLenOfFields { .. } |
|
||||
Self::UnsupportedMessage { .. } |
|
||||
Self::P2PStreamError(
|
||||
P2PStreamError::Rlp(_) |
|
||||
P2PStreamError::Snap(_) |
|
||||
P2PStreamError::MessageTooBig { .. } |
|
||||
P2PStreamError::UnknownReservedMessageId(_) |
|
||||
P2PStreamError::EmptyProtocolMessage |
|
||||
P2PStreamError::UnknownDisconnectReason(_)
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
/// Returns the [`io::Error`] if it was caused by IO
|
||||
pub const fn as_io(&self) -> Option<&io::Error> {
|
||||
if let Self::P2PStreamError(P2PStreamError::Io(io)) = self {
|
||||
|
||||
@@ -21,7 +21,7 @@ alloy-eip2124.workspace = true
|
||||
# misc
|
||||
serde = { workspace = true, optional = true }
|
||||
humantime-serde = { workspace = true, optional = true }
|
||||
serde_json = { workspace = true, features = ["std"], optional = true }
|
||||
serde_json = { workspace = true, features = ["std"] }
|
||||
|
||||
# misc
|
||||
tracing.workspace = true
|
||||
@@ -30,7 +30,6 @@ tracing.workspace = true
|
||||
serde = [
|
||||
"dep:serde",
|
||||
"dep:humantime-serde",
|
||||
"dep:serde_json",
|
||||
"alloy-eip2124/serde",
|
||||
]
|
||||
test-utils = []
|
||||
|
||||
@@ -1,9 +1,15 @@
|
||||
//! Configuration for peering.
|
||||
|
||||
use std::{collections::HashSet, time::Duration};
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
io::{self, ErrorKind},
|
||||
path::Path,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use reth_net_banlist::{BanList, IpFilter};
|
||||
use reth_network_peers::{NodeRecord, TrustedPeer};
|
||||
use tracing::info;
|
||||
|
||||
use crate::{peers::PersistedPeerInfo, BackoffKind, ReputationChangeWeights};
|
||||
|
||||
@@ -305,16 +311,16 @@ impl PeersConfig {
|
||||
#[cfg(feature = "serde")]
|
||||
pub fn with_basic_nodes_from_file(
|
||||
mut self,
|
||||
optional_file: Option<impl AsRef<std::path::Path>>,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
optional_file: Option<impl AsRef<Path>>,
|
||||
) -> Result<Self, io::Error> {
|
||||
let Some(file_path) = optional_file else { return Ok(self) };
|
||||
let raw = match std::fs::read_to_string(file_path.as_ref()) {
|
||||
Ok(contents) => contents,
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(self),
|
||||
Err(e) if e.kind() == ErrorKind::NotFound => return Ok(self),
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
|
||||
tracing::info!(target: "net::peers", file = %file_path.as_ref().display(), "Loading saved peers");
|
||||
info!(target: "net::peers", file = %file_path.as_ref().display(), "Loading saved peers");
|
||||
|
||||
// Try the new format first, fall back to legacy Vec<NodeRecord>
|
||||
let peers: Vec<PersistedPeerInfo> = serde_json::from_str(&raw)
|
||||
@@ -324,9 +330,9 @@ impl PeersConfig {
|
||||
nodes.into_iter().map(PersistedPeerInfo::from_node_record).collect(),
|
||||
)
|
||||
})
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
|
||||
|
||||
tracing::info!(target: "net::peers", count = peers.len(), "Loaded persisted peers");
|
||||
info!(target: "net::peers", count = peers.len(), "Loaded persisted peers");
|
||||
self.persisted_peers = peers;
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
@@ -742,9 +742,7 @@ impl<N: NetworkPrimitives> Future for ActiveSession<N> {
|
||||
}
|
||||
OnIncomingMessageOutcome::BadMessage { error, message } => {
|
||||
debug!(target: "net::session", %error, msg=?message, remote_peer_id=?this.remote_peer_id, "received invalid protocol message");
|
||||
this.on_bad_message();
|
||||
return this
|
||||
.try_disconnect(DisconnectReason::ProtocolBreach, cx)
|
||||
return this.close_on_error(error, cx)
|
||||
}
|
||||
OnIncomingMessageOutcome::NoCapacity(msg) => {
|
||||
// failed to send due to lack of capacity
|
||||
@@ -754,10 +752,6 @@ impl<N: NetworkPrimitives> Future for ActiveSession<N> {
|
||||
}
|
||||
Err(err) => {
|
||||
debug!(target: "net::session", %err, remote_peer_id=?this.remote_peer_id, "failed to receive message");
|
||||
if err.is_protocol_breach() {
|
||||
this.on_bad_message();
|
||||
return this.try_disconnect(DisconnectReason::ProtocolBreach, cx)
|
||||
}
|
||||
return this.close_on_error(err, cx)
|
||||
}
|
||||
}
|
||||
@@ -972,7 +966,6 @@ mod tests {
|
||||
GetBlockBodies, HelloMessageWithProtocols, P2PStream, StatusBuilder, UnauthedEthStream,
|
||||
UnauthedP2PStream, UnifiedStatus,
|
||||
};
|
||||
use reth_eth_wire_types::{EthMessageID, RawCapabilityMessage};
|
||||
use reth_ethereum_forks::EthereumHardfork;
|
||||
use reth_network_peers::pk2id;
|
||||
use reth_network_types::session::config::PROTOCOL_BREACH_REQUEST_TIMEOUT;
|
||||
@@ -1168,40 +1161,6 @@ mod tests {
|
||||
fut.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_invalid_message_disconnects_with_protocol_breach() {
|
||||
let mut builder = SessionBuilder::default();
|
||||
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let local_addr = listener.local_addr().unwrap();
|
||||
|
||||
let fut = builder.with_client_stream(local_addr, async move |mut client_stream| {
|
||||
client_stream
|
||||
.start_send_raw(RawCapabilityMessage::eth(
|
||||
EthMessageID::PooledTransactions,
|
||||
vec![0xc0].into(),
|
||||
))
|
||||
.unwrap();
|
||||
client_stream.flush().await.unwrap();
|
||||
|
||||
let msg = client_stream.next().await.unwrap().unwrap_err();
|
||||
assert_eq!(msg.as_disconnected(), Some(DisconnectReason::ProtocolBreach));
|
||||
});
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
let (incoming, _) = listener.accept().await.unwrap();
|
||||
let session = builder.connect_incoming(incoming).await;
|
||||
session.await;
|
||||
|
||||
tx.send(()).unwrap();
|
||||
});
|
||||
|
||||
fut.await;
|
||||
rx.await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn handle_dropped_stream() {
|
||||
let mut builder = SessionBuilder::default();
|
||||
|
||||
@@ -442,12 +442,8 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
|
||||
search_durations.find_idle_peer
|
||||
);
|
||||
|
||||
// peer may have disconnected between idle check and here, re-buffer hashes so they
|
||||
// aren't lost from the pending fetch cache
|
||||
let Some(peer) = peers.get(&peer_id) else {
|
||||
self.buffer_hashes(hashes_to_request, None);
|
||||
return false
|
||||
};
|
||||
// peer should always exist since `is_session_active` already checked
|
||||
let Some(peer) = peers.get(&peer_id) else { return false };
|
||||
let conn_eth_version = peer.version;
|
||||
|
||||
// fill the request with more hashes pending fetch that have been announced by the peer.
|
||||
@@ -1453,26 +1449,6 @@ mod test {
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn on_fetch_pending_hashes_rebuffers_on_disconnected_peer() {
|
||||
let tx_fetcher = &mut TransactionFetcher::default();
|
||||
let peer_1 = PeerId::new([1; 64]);
|
||||
let peer_2 = PeerId::new([2; 64]);
|
||||
let hash_1 = B256::from_slice(&[1; 32]);
|
||||
|
||||
buffer_hash_to_tx_fetcher(tx_fetcher, hash_1, peer_1, 0, Some(128));
|
||||
buffer_hash_to_tx_fetcher(tx_fetcher, hash_1, peer_2, 0, Some(128));
|
||||
|
||||
assert_eq!(tx_fetcher.num_pending_hashes(), 1);
|
||||
|
||||
// pass empty peers map — both peers are "disconnected"
|
||||
let peers = HashMap::new();
|
||||
tx_fetcher.on_fetch_pending_hashes(&peers, |_| true);
|
||||
|
||||
// hash should be re-buffered, not lost
|
||||
assert_eq!(tx_fetcher.num_pending_hashes(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn verify_response_hashes() {
|
||||
let input = hex!(
|
||||
|
||||
@@ -460,14 +460,6 @@ impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
|
||||
self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
|
||||
}
|
||||
|
||||
/// Handles a closed peer session, removing the peer from transaction-local tracking state.
|
||||
fn on_peer_session_closed(&mut self, peer_id: &PeerId) {
|
||||
if let Some(mut peer) = self.peers.remove(peer_id) {
|
||||
self.policies.propagation_policy_mut().on_session_closed(&mut peer);
|
||||
}
|
||||
self.transaction_fetcher.remove_peer(peer_id);
|
||||
}
|
||||
|
||||
/// Clear the transaction
|
||||
fn on_good_import(&mut self, hash: TxHash) {
|
||||
self.transactions_by_peers.remove(&hash);
|
||||
@@ -1254,7 +1246,13 @@ where
|
||||
fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
|
||||
match event_result {
|
||||
NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
|
||||
self.on_peer_session_closed(&peer_id);
|
||||
// remove the peer
|
||||
|
||||
let peer = self.peers.remove(&peer_id);
|
||||
if let Some(mut peer) = peer {
|
||||
self.policies.propagation_policy_mut().on_session_closed(&mut peer);
|
||||
}
|
||||
self.transaction_fetcher.remove_peer(&peer_id);
|
||||
}
|
||||
NetworkEvent::ActivePeerSession { info, messages } => {
|
||||
// process active peer session and broadcast available transaction from the pool
|
||||
@@ -2169,7 +2167,7 @@ mod tests {
|
||||
NetworkConfigBuilder, NetworkManager,
|
||||
};
|
||||
use alloy_consensus::{TxEip1559, TxLegacy};
|
||||
use alloy_primitives::{hex, Signature, TxKind, B256, U256};
|
||||
use alloy_primitives::{hex, Signature, TxKind, U256};
|
||||
use alloy_rlp::Decodable;
|
||||
use futures::FutureExt;
|
||||
use reth_chainspec::MIN_TRANSACTION_GAS;
|
||||
@@ -2513,46 +2511,6 @@ mod tests {
|
||||
handle.terminate().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_session_closed_cleans_transaction_peer_state() {
|
||||
let (mut tx_manager, _network) = new_tx_manager().await;
|
||||
let peer_id = PeerId::new([1; 64]);
|
||||
let fallback_peer = PeerId::new([2; 64]);
|
||||
let (peer, _) = new_mock_session(peer_id, EthVersion::Eth66);
|
||||
let hash_shared = B256::from_slice(&[1; 32]);
|
||||
|
||||
tx_manager.peers.insert(peer_id, peer);
|
||||
buffer_hash_to_tx_fetcher(
|
||||
&mut tx_manager.transaction_fetcher,
|
||||
hash_shared,
|
||||
peer_id,
|
||||
0,
|
||||
None,
|
||||
);
|
||||
buffer_hash_to_tx_fetcher(
|
||||
&mut tx_manager.transaction_fetcher,
|
||||
hash_shared,
|
||||
fallback_peer,
|
||||
0,
|
||||
None,
|
||||
);
|
||||
tx_manager.transaction_fetcher.active_peers.insert(peer_id, 1);
|
||||
|
||||
tx_manager.on_network_event(NetworkEvent::Peer(PeerEvent::SessionClosed {
|
||||
peer_id,
|
||||
reason: None,
|
||||
}));
|
||||
|
||||
// peer removed from peers map and active_peers
|
||||
assert!(!tx_manager.peers.contains_key(&peer_id));
|
||||
assert!(tx_manager.transaction_fetcher.active_peers.peek(&peer_id).is_none());
|
||||
// fallback peer is still available for the hash
|
||||
assert_eq!(
|
||||
tx_manager.transaction_fetcher.get_idle_peer_for(hash_shared),
|
||||
Some(&fallback_peer)
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_on_get_pooled_transactions_network() {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
@@ -32,7 +32,7 @@ use reth_node_core::{
|
||||
primitives::Head,
|
||||
};
|
||||
use reth_provider::{
|
||||
providers::{BlockchainProvider, NodeTypesForProvider, RocksDBProvider},
|
||||
providers::{BlockchainProvider, NodeTypesForProvider},
|
||||
ChainSpecProvider, FullProvider,
|
||||
};
|
||||
use reth_tasks::TaskExecutor;
|
||||
@@ -154,14 +154,12 @@ pub struct NodeBuilder<DB, ChainSpec> {
|
||||
config: NodeConfig<ChainSpec>,
|
||||
/// The configured database for the node.
|
||||
database: DB,
|
||||
/// An optional [`RocksDBProvider`] to use instead of creating one during launch.
|
||||
rocksdb_provider: Option<RocksDBProvider>,
|
||||
}
|
||||
|
||||
impl<ChainSpec> NodeBuilder<(), ChainSpec> {
|
||||
/// Create a new [`NodeBuilder`].
|
||||
pub const fn new(config: NodeConfig<ChainSpec>) -> Self {
|
||||
Self { config, database: (), rocksdb_provider: None }
|
||||
Self { config, database: () }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -230,13 +228,7 @@ impl<DB, ChainSpec> NodeBuilder<DB, ChainSpec> {
|
||||
impl<DB, ChainSpec: EthChainSpec> NodeBuilder<DB, ChainSpec> {
|
||||
/// Configures the underlying database that the node will use.
|
||||
pub fn with_database<D>(self, database: D) -> NodeBuilder<D, ChainSpec> {
|
||||
NodeBuilder { config: self.config, database, rocksdb_provider: self.rocksdb_provider }
|
||||
}
|
||||
|
||||
/// Sets the [`RocksDBProvider`] to use instead of creating one during launch.
|
||||
pub fn with_rocksdb_provider(mut self, rocksdb_provider: RocksDBProvider) -> Self {
|
||||
self.rocksdb_provider = Some(rocksdb_provider);
|
||||
self
|
||||
NodeBuilder { config: self.config, database }
|
||||
}
|
||||
|
||||
/// Preconfigure the builder with the context to launch the node.
|
||||
@@ -305,7 +297,7 @@ where
|
||||
T: NodeTypesForProvider<ChainSpec = ChainSpec>,
|
||||
P: FullProvider<NodeTypesWithDBAdapter<T, DB>>,
|
||||
{
|
||||
NodeBuilderWithTypes::new(self.config, self.database, self.rocksdb_provider)
|
||||
NodeBuilderWithTypes::new(self.config, self.database)
|
||||
}
|
||||
|
||||
/// Preconfigures the node with a specific node implementation.
|
||||
@@ -355,12 +347,6 @@ where
|
||||
DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
|
||||
ChainSpec: EthChainSpec + EthereumHardforks,
|
||||
{
|
||||
/// Sets the [`RocksDBProvider`] to use instead of creating one during launch.
|
||||
pub fn with_rocksdb_provider(mut self, rocksdb_provider: RocksDBProvider) -> Self {
|
||||
self.builder.rocksdb_provider = Some(rocksdb_provider);
|
||||
self
|
||||
}
|
||||
|
||||
/// Configures the types of the node.
|
||||
pub fn with_types<T>(self) -> WithLaunchContext<NodeBuilderWithTypes<RethFullAdapter<DB, T>>>
|
||||
where
|
||||
|
||||
@@ -16,7 +16,6 @@ use crate::{
|
||||
use reth_exex::ExExContext;
|
||||
use reth_node_api::{FullNodeComponents, FullNodeTypes, NodeAddOns, NodeTypes};
|
||||
use reth_node_core::node_config::NodeConfig;
|
||||
use reth_provider::providers::RocksDBProvider;
|
||||
use reth_tasks::TaskExecutor;
|
||||
use std::{fmt, fmt::Debug, future::Future};
|
||||
|
||||
@@ -26,8 +25,6 @@ pub struct NodeBuilderWithTypes<T: FullNodeTypes> {
|
||||
config: NodeConfig<<T::Types as NodeTypes>::ChainSpec>,
|
||||
/// The configured database for the node.
|
||||
adapter: NodeTypesAdapter<T>,
|
||||
/// An optional [`RocksDBProvider`] to use instead of creating one during launch.
|
||||
rocksdb_provider: Option<RocksDBProvider>,
|
||||
}
|
||||
|
||||
impl<T: FullNodeTypes> NodeBuilderWithTypes<T> {
|
||||
@@ -35,9 +32,8 @@ impl<T: FullNodeTypes> NodeBuilderWithTypes<T> {
|
||||
pub const fn new(
|
||||
config: NodeConfig<<T::Types as NodeTypes>::ChainSpec>,
|
||||
database: T::DB,
|
||||
rocksdb_provider: Option<RocksDBProvider>,
|
||||
) -> Self {
|
||||
Self { config, adapter: NodeTypesAdapter::new(database), rocksdb_provider }
|
||||
Self { config, adapter: NodeTypesAdapter::new(database) }
|
||||
}
|
||||
|
||||
/// Advances the state of the node builder to the next state where all components are configured
|
||||
@@ -45,12 +41,11 @@ impl<T: FullNodeTypes> NodeBuilderWithTypes<T> {
|
||||
where
|
||||
CB: NodeComponentsBuilder<T>,
|
||||
{
|
||||
let Self { config, adapter, rocksdb_provider } = self;
|
||||
let Self { config, adapter } = self;
|
||||
|
||||
NodeBuilderWithComponents {
|
||||
config,
|
||||
adapter,
|
||||
rocksdb_provider,
|
||||
components_builder,
|
||||
add_ons: AddOns { hooks: NodeHooks::default(), exexs: Vec::new(), add_ons: () },
|
||||
}
|
||||
@@ -155,8 +150,6 @@ pub struct NodeBuilderWithComponents<
|
||||
pub config: NodeConfig<<T::Types as NodeTypes>::ChainSpec>,
|
||||
/// Adapter for the underlying node types and database
|
||||
pub adapter: NodeTypesAdapter<T>,
|
||||
/// An optional [`RocksDBProvider`] to use instead of creating one during launch.
|
||||
pub rocksdb_provider: Option<RocksDBProvider>,
|
||||
/// container for type specific components
|
||||
pub components_builder: CB,
|
||||
/// Additional node extensions.
|
||||
@@ -174,12 +167,11 @@ where
|
||||
where
|
||||
AO: NodeAddOns<NodeAdapter<T, CB::Components>>,
|
||||
{
|
||||
let Self { config, adapter, rocksdb_provider, components_builder, .. } = self;
|
||||
let Self { config, adapter, components_builder, .. } = self;
|
||||
|
||||
NodeBuilderWithComponents {
|
||||
config,
|
||||
adapter,
|
||||
rocksdb_provider,
|
||||
components_builder,
|
||||
add_ons: AddOns { hooks: NodeHooks::default(), exexs: Vec::new(), add_ons },
|
||||
}
|
||||
|
||||
@@ -472,7 +472,6 @@ where
|
||||
pub async fn create_provider_factory<N, Evm>(
|
||||
&self,
|
||||
changeset_cache: ChangesetCache,
|
||||
rocksdb_provider: Option<RocksDBProvider>,
|
||||
) -> eyre::Result<ProviderFactory<N>>
|
||||
where
|
||||
N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
|
||||
@@ -490,16 +489,12 @@ where
|
||||
.with_genesis_block_number(self.chain_spec().genesis().number.unwrap_or_default())
|
||||
.build()?;
|
||||
|
||||
// Use the provided RocksDB provider or create a new one
|
||||
let rocksdb_provider = if let Some(provider) = rocksdb_provider {
|
||||
provider
|
||||
} else {
|
||||
RocksDBProvider::builder(self.data_dir().rocksdb())
|
||||
.with_default_tables()
|
||||
.with_metrics()
|
||||
.with_statistics()
|
||||
.build()?
|
||||
};
|
||||
// Initialize RocksDB provider with metrics, statistics, and default tables
|
||||
let rocksdb_provider = RocksDBProvider::builder(self.data_dir().rocksdb())
|
||||
.with_default_tables()
|
||||
.with_metrics()
|
||||
.with_statistics()
|
||||
.build()?;
|
||||
|
||||
let factory = ProviderFactory::new(
|
||||
self.right().clone(),
|
||||
@@ -578,14 +573,12 @@ where
|
||||
pub async fn with_provider_factory<N, Evm>(
|
||||
self,
|
||||
changeset_cache: ChangesetCache,
|
||||
rocksdb_provider: Option<RocksDBProvider>,
|
||||
) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
|
||||
where
|
||||
N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
|
||||
Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
|
||||
{
|
||||
let factory =
|
||||
self.create_provider_factory::<N, Evm>(changeset_cache, rocksdb_provider).await?;
|
||||
let factory = self.create_provider_factory::<N, Evm>(changeset_cache).await?;
|
||||
let ctx = LaunchContextWith {
|
||||
inner: self.inner,
|
||||
attachment: self.attachment.map_right(|_| factory),
|
||||
|
||||
@@ -15,7 +15,7 @@ use reth_engine_tree::{
|
||||
chain::{ChainEvent, FromOrchestrator},
|
||||
engine::{EngineApiKind, EngineApiRequest, EngineRequestHandler},
|
||||
launch::build_engine_orchestrator,
|
||||
tree::{EngineSharedCaches, PayloadSparseTrieKind, TreeConfig},
|
||||
tree::TreeConfig,
|
||||
};
|
||||
use reth_engine_util::EngineMessageStreamExt;
|
||||
use reth_exex::ExExManagerHandle;
|
||||
@@ -81,7 +81,6 @@ impl EngineNodeLauncher {
|
||||
let Self { ctx, engine_tree_config } = self;
|
||||
let NodeBuilderWithComponents {
|
||||
adapter: NodeTypesAdapter { database },
|
||||
rocksdb_provider,
|
||||
components_builder,
|
||||
add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
|
||||
config,
|
||||
@@ -90,10 +89,6 @@ impl EngineNodeLauncher {
|
||||
|
||||
// Create changeset cache that will be shared across the engine
|
||||
let changeset_cache = ChangesetCache::new();
|
||||
let main_shared_caches =
|
||||
EngineSharedCaches::<<CB::Components as NodeComponents<T>>::Evm>::with_sparse_trie_kind(
|
||||
PayloadSparseTrieKind::from(engine_tree_config.enable_arena_sparse_trie()),
|
||||
);
|
||||
|
||||
// setup the launch context
|
||||
let ctx = ctx
|
||||
@@ -107,7 +102,7 @@ impl EngineNodeLauncher {
|
||||
// ensure certain settings take effect
|
||||
.with_adjusted_configs()
|
||||
// Create the provider factory with changeset cache
|
||||
.with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>(changeset_cache.clone(), rocksdb_provider).await?
|
||||
.with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>(changeset_cache.clone()).await?
|
||||
.inspect(|_| {
|
||||
info!(target: "reth::cli", "Database opened");
|
||||
})
|
||||
@@ -127,8 +122,7 @@ impl EngineNodeLauncher {
|
||||
.with_blockchain_db::<T, _>(move |provider_factory| {
|
||||
Ok(BlockchainProvider::new(provider_factory)?)
|
||||
})?
|
||||
.with_components(components_builder, on_component_initialized)
|
||||
.await?;
|
||||
.with_components(components_builder, on_component_initialized).await?;
|
||||
|
||||
// spawn exexs if any
|
||||
let maybe_exex_manager_handle = ctx.launch_exex(installed_exex).await?;
|
||||
@@ -199,12 +193,7 @@ impl EngineNodeLauncher {
|
||||
// Build the engine validator with all required components
|
||||
let engine_validator = validator_builder
|
||||
.clone()
|
||||
.build_tree_validator_with_caches(
|
||||
&add_ons_ctx,
|
||||
engine_tree_config.clone(),
|
||||
changeset_cache.clone(),
|
||||
main_shared_caches.clone(),
|
||||
)
|
||||
.build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), changeset_cache.clone())
|
||||
.await?;
|
||||
|
||||
// Create the consensus engine stream with optional reorg
|
||||
@@ -217,18 +206,8 @@ impl EngineNodeLauncher {
|
||||
|| async {
|
||||
// Create a separate cache for reorg validator (not shared with main engine)
|
||||
let reorg_cache = ChangesetCache::new();
|
||||
let reorg_shared_caches = EngineSharedCaches::<
|
||||
<CB::Components as NodeComponents<T>>::Evm,
|
||||
>::with_sparse_trie_kind(
|
||||
PayloadSparseTrieKind::from(engine_tree_config.enable_arena_sparse_trie()),
|
||||
);
|
||||
validator_builder
|
||||
.build_tree_validator_with_caches(
|
||||
&add_ons_ctx,
|
||||
engine_tree_config.clone(),
|
||||
reorg_cache,
|
||||
reorg_shared_caches,
|
||||
)
|
||||
.build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), reorg_cache)
|
||||
.await
|
||||
},
|
||||
node_config.debug.reorg_frequency,
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
//! Builder support for rpc components.
|
||||
|
||||
pub use jsonrpsee::server::middleware::rpc::{RpcService, RpcServiceBuilder};
|
||||
use reth_engine_tree::tree::WaitForCaches;
|
||||
pub use reth_engine_tree::tree::{BasicEngineValidator, EngineValidator};
|
||||
use reth_engine_tree::tree::{EngineSharedCaches, PayloadSparseTrieKind, WaitForCaches};
|
||||
pub use reth_rpc_builder::{middleware::RethRpcMiddleware, Identity, Stack};
|
||||
pub use reth_trie_db::ChangesetCache;
|
||||
|
||||
@@ -981,8 +981,7 @@ where
|
||||
let Self { eth_api_builder, engine_api_builder, hooks, .. } = self;
|
||||
|
||||
let engine_api = engine_api_builder.build_engine_api(&ctx).await?;
|
||||
let AddOnsContext { node, config, beacon_engine_handle, jwt_secret, engine_events, .. } =
|
||||
ctx;
|
||||
let AddOnsContext { node, config, beacon_engine_handle, jwt_secret, engine_events } = ctx;
|
||||
|
||||
info!(target: "reth::cli", "Engine API handler initialized");
|
||||
|
||||
@@ -1295,25 +1294,6 @@ pub trait EngineValidatorBuilder<Node: FullNodeComponents>: Send + Sync + Clone
|
||||
tree_config: TreeConfig,
|
||||
changeset_cache: ChangesetCache,
|
||||
) -> impl Future<Output = eyre::Result<Self::EngineValidator>> + Send;
|
||||
|
||||
/// Builds the tree validator using the shared cache handles exported by the launcher.
|
||||
///
|
||||
/// The default implementation preserves the legacy behavior and ignores the provided caches.
|
||||
fn build_tree_validator_with_caches(
|
||||
self,
|
||||
ctx: &AddOnsContext<'_, Node>,
|
||||
tree_config: TreeConfig,
|
||||
changeset_cache: ChangesetCache,
|
||||
shared_caches: EngineSharedCaches<Node::Evm>,
|
||||
) -> impl Future<Output = eyre::Result<Self::EngineValidator>> + Send
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
async move {
|
||||
let _ = shared_caches;
|
||||
self.build_tree_validator(ctx, tree_config, changeset_cache).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Basic implementation of [`EngineValidatorBuilder`].
|
||||
@@ -1361,20 +1341,6 @@ where
|
||||
ctx: &AddOnsContext<'_, Node>,
|
||||
tree_config: TreeConfig,
|
||||
changeset_cache: ChangesetCache,
|
||||
) -> eyre::Result<Self::EngineValidator> {
|
||||
let shared_caches = EngineSharedCaches::with_sparse_trie_kind(PayloadSparseTrieKind::from(
|
||||
tree_config.enable_arena_sparse_trie(),
|
||||
));
|
||||
self.build_tree_validator_with_caches(ctx, tree_config, changeset_cache, shared_caches)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn build_tree_validator_with_caches(
|
||||
self,
|
||||
ctx: &AddOnsContext<'_, Node>,
|
||||
tree_config: TreeConfig,
|
||||
changeset_cache: ChangesetCache,
|
||||
shared_caches: EngineSharedCaches<Node::Evm>,
|
||||
) -> eyre::Result<Self::EngineValidator> {
|
||||
let validator = self.payload_validator_builder.build(ctx).await?;
|
||||
let data_dir = ctx.config.datadir.clone().resolve_datadir(ctx.config.chain.chain());
|
||||
@@ -1387,7 +1353,6 @@ where
|
||||
validator,
|
||||
tree_config,
|
||||
invalid_block_hook,
|
||||
shared_caches,
|
||||
changeset_cache,
|
||||
ctx.node.task_executor().clone(),
|
||||
))
|
||||
|
||||
@@ -93,6 +93,12 @@ min-trace-logs = ["tracing/release_max_level_trace"]
|
||||
# Debug recording for sparse trie mutations
|
||||
trie-debug = ["reth-engine-primitives/trie-debug"]
|
||||
|
||||
# Route supported tables to RocksDB instead of MDBX
|
||||
rocksdb = ["reth-storage-api/rocksdb"]
|
||||
|
||||
# Marker feature for edge/unstable builds - enables rocksdb and sets v2 defaults
|
||||
edge = ["rocksdb"]
|
||||
|
||||
[build-dependencies]
|
||||
vergen = { workspace = true, features = ["build", "cargo", "emit_and_set"] }
|
||||
vergen-git2.workspace = true
|
||||
|
||||
@@ -42,10 +42,13 @@ rayon.workspace = true
|
||||
tokio.workspace = true
|
||||
rustc-hash.workspace = true
|
||||
|
||||
[features]
|
||||
rocksdb = ["reth-provider/rocksdb"]
|
||||
|
||||
[dev-dependencies]
|
||||
# reth
|
||||
reth-db = { workspace = true, features = ["test-utils"] }
|
||||
reth-stages = { workspace = true, features = ["test-utils"] }
|
||||
reth-stages = { workspace = true, features = ["test-utils", "rocksdb"] }
|
||||
reth-primitives-traits = { workspace = true, features = ["arbitrary"] }
|
||||
reth-testing-utils.workspace = true
|
||||
reth-tracing.workspace = true
|
||||
|
||||
@@ -74,6 +74,7 @@ where
|
||||
let range_end = *range.end();
|
||||
|
||||
// Check where account history indices are stored
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if provider.cached_storage_settings().storage_v2 {
|
||||
return self.prune_rocksdb(provider, input, range, range_end);
|
||||
}
|
||||
@@ -231,6 +232,7 @@ impl AccountHistory {
|
||||
///
|
||||
/// Reads account changesets from static files and prunes the corresponding
|
||||
/// `RocksDB` history shards.
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
fn prune_rocksdb<Provider>(
|
||||
&self,
|
||||
provider: &Provider,
|
||||
@@ -504,6 +506,157 @@ mod tests {
|
||||
test_prune(1400, 3, (PruneProgress::Finished, 804));
|
||||
}
|
||||
|
||||
/// Tests the `prune_static_files` code path. On unix with rocksdb feature, v2 storage
|
||||
/// routes to `prune_rocksdb` instead, so this test only runs without rocksdb (the
|
||||
/// `prune_rocksdb_path` test covers that configuration).
|
||||
#[test]
|
||||
#[cfg(not(all(unix, feature = "rocksdb")))]
|
||||
fn prune_static_file() {
|
||||
let db = TestStageDB::default();
|
||||
let mut rng = generators::rng();
|
||||
|
||||
let blocks = random_block_range(
|
||||
&mut rng,
|
||||
0..=5000,
|
||||
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
|
||||
);
|
||||
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
|
||||
|
||||
let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
|
||||
|
||||
let (changesets, _) = random_changeset_range(
|
||||
&mut rng,
|
||||
blocks.iter(),
|
||||
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
|
||||
0..0,
|
||||
0..0,
|
||||
);
|
||||
|
||||
db.insert_changesets_to_static_files(changesets.clone(), None)
|
||||
.expect("insert changesets to static files");
|
||||
db.insert_history(changesets.clone(), None).expect("insert history");
|
||||
|
||||
let account_occurrences = db.table::<tables::AccountsHistory>().unwrap().into_iter().fold(
|
||||
BTreeMap::<_, usize>::new(),
|
||||
|mut map, (key, _)| {
|
||||
map.entry(key.key).or_default().add_assign(1);
|
||||
map
|
||||
},
|
||||
);
|
||||
assert!(account_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
|
||||
|
||||
let original_shards = db.table::<tables::AccountsHistory>().unwrap();
|
||||
|
||||
let test_prune =
|
||||
|to_block: BlockNumber, run: usize, expected_result: (PruneProgress, usize)| {
|
||||
let prune_mode = PruneMode::Before(to_block);
|
||||
let deleted_entries_limit = 2000;
|
||||
let mut limiter =
|
||||
PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
|
||||
let input = PruneInput {
|
||||
previous_checkpoint: db
|
||||
.factory
|
||||
.provider()
|
||||
.unwrap()
|
||||
.get_prune_checkpoint(PruneSegment::AccountHistory)
|
||||
.unwrap(),
|
||||
to_block,
|
||||
limiter: limiter.clone(),
|
||||
};
|
||||
let segment = AccountHistory::new(prune_mode);
|
||||
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
provider.set_storage_settings_cache(StorageSettings::v2());
|
||||
let result = segment.prune(&provider, input).unwrap();
|
||||
limiter.increment_deleted_entries_count_by(result.pruned);
|
||||
|
||||
assert_matches!(
|
||||
result,
|
||||
SegmentOutput {progress, pruned, checkpoint: Some(_)}
|
||||
if (progress, pruned) == expected_result
|
||||
);
|
||||
|
||||
segment
|
||||
.save_checkpoint(
|
||||
&provider,
|
||||
result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
|
||||
)
|
||||
.unwrap();
|
||||
provider.commit().expect("commit");
|
||||
|
||||
let changesets = changesets
|
||||
.iter()
|
||||
.enumerate()
|
||||
.flat_map(|(block_number, changeset)| {
|
||||
changeset.iter().map(move |change| (block_number, change))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
#[expect(clippy::skip_while_next)]
|
||||
let pruned = changesets
|
||||
.iter()
|
||||
.enumerate()
|
||||
.skip_while(|(i, (block_number, _))| {
|
||||
*i < deleted_entries_limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE * run &&
|
||||
*block_number <= to_block as usize
|
||||
})
|
||||
.next()
|
||||
.map(|(i, _)| i)
|
||||
.unwrap_or_default();
|
||||
|
||||
// Skip what we've pruned so far, subtracting one to get last pruned block number
|
||||
// further down
|
||||
let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
|
||||
|
||||
let last_pruned_block_number = pruned_changesets
|
||||
.next()
|
||||
.map(|(block_number, _)| {
|
||||
(if result.progress.is_finished() {
|
||||
*block_number
|
||||
} else {
|
||||
block_number.saturating_sub(1)
|
||||
}) as BlockNumber
|
||||
})
|
||||
.unwrap_or(to_block);
|
||||
|
||||
let actual_shards = db.table::<tables::AccountsHistory>().unwrap();
|
||||
|
||||
let expected_shards = original_shards
|
||||
.iter()
|
||||
.filter(|(key, _)| key.highest_block_number > last_pruned_block_number)
|
||||
.map(|(key, blocks)| {
|
||||
let new_blocks =
|
||||
blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
|
||||
(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
assert_eq!(actual_shards, expected_shards);
|
||||
|
||||
assert_eq!(
|
||||
db.factory
|
||||
.provider()
|
||||
.unwrap()
|
||||
.get_prune_checkpoint(PruneSegment::AccountHistory)
|
||||
.unwrap(),
|
||||
Some(PruneCheckpoint {
|
||||
block_number: Some(last_pruned_block_number),
|
||||
tx_number: None,
|
||||
prune_mode
|
||||
})
|
||||
);
|
||||
};
|
||||
|
||||
test_prune(
|
||||
998,
|
||||
1,
|
||||
(PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 1000),
|
||||
);
|
||||
test_prune(998, 2, (PruneProgress::Finished, 1000));
|
||||
test_prune(1400, 3, (PruneProgress::Finished, 804));
|
||||
}
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
#[test]
|
||||
fn prune_rocksdb_path() {
|
||||
use reth_db_api::models::ShardedKey;
|
||||
|
||||
@@ -75,6 +75,7 @@ where
|
||||
let range_end = *range.end();
|
||||
|
||||
// Check where storage history indices are stored
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if provider.cached_storage_settings().storage_v2 {
|
||||
return self.prune_rocksdb(provider, input, range, range_end);
|
||||
}
|
||||
@@ -235,6 +236,7 @@ impl StorageHistory {
|
||||
///
|
||||
/// Reads storage changesets from static files and prunes the corresponding
|
||||
/// `RocksDB` history shards.
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
fn prune_rocksdb<Provider>(
|
||||
&self,
|
||||
provider: &Provider,
|
||||
@@ -516,6 +518,159 @@ mod tests {
|
||||
test_prune(1200, 3, (PruneProgress::Finished, 202));
|
||||
}
|
||||
|
||||
/// Tests the `prune_static_files` code path. On unix with rocksdb feature, v2 storage
|
||||
/// routes to `prune_rocksdb` instead, so this test only runs without rocksdb (the
|
||||
/// `prune_rocksdb_path` test covers that configuration).
|
||||
#[test]
|
||||
#[cfg(not(all(unix, feature = "rocksdb")))]
|
||||
fn prune_static_file() {
|
||||
let db = TestStageDB::default();
|
||||
let mut rng = generators::rng();
|
||||
|
||||
let blocks = random_block_range(
|
||||
&mut rng,
|
||||
0..=5000,
|
||||
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
|
||||
);
|
||||
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
|
||||
|
||||
let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
|
||||
|
||||
let (changesets, _) = random_changeset_range(
|
||||
&mut rng,
|
||||
blocks.iter(),
|
||||
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
|
||||
1..2,
|
||||
1..2,
|
||||
);
|
||||
|
||||
db.insert_changesets_to_static_files(changesets.clone(), None)
|
||||
.expect("insert changesets to static files");
|
||||
db.insert_history(changesets.clone(), None).expect("insert history");
|
||||
|
||||
let storage_occurrences = db.table::<tables::StoragesHistory>().unwrap().into_iter().fold(
|
||||
BTreeMap::<_, usize>::new(),
|
||||
|mut map, (key, _)| {
|
||||
map.entry((key.address, key.sharded_key.key)).or_default().add_assign(1);
|
||||
map
|
||||
},
|
||||
);
|
||||
assert!(storage_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
|
||||
|
||||
let original_shards = db.table::<tables::StoragesHistory>().unwrap();
|
||||
|
||||
let test_prune = |to_block: BlockNumber,
|
||||
run: usize,
|
||||
expected_result: (PruneProgress, usize)| {
|
||||
let prune_mode = PruneMode::Before(to_block);
|
||||
let deleted_entries_limit = 1000;
|
||||
let mut limiter =
|
||||
PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
|
||||
let input = PruneInput {
|
||||
previous_checkpoint: db
|
||||
.factory
|
||||
.provider()
|
||||
.unwrap()
|
||||
.get_prune_checkpoint(PruneSegment::StorageHistory)
|
||||
.unwrap(),
|
||||
to_block,
|
||||
limiter: limiter.clone(),
|
||||
};
|
||||
let segment = StorageHistory::new(prune_mode);
|
||||
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
provider.set_storage_settings_cache(StorageSettings::v2());
|
||||
let result = segment.prune(&provider, input).unwrap();
|
||||
limiter.increment_deleted_entries_count_by(result.pruned);
|
||||
|
||||
assert_matches!(
|
||||
result,
|
||||
SegmentOutput {progress, pruned, checkpoint: Some(_)}
|
||||
if (progress, pruned) == expected_result
|
||||
);
|
||||
|
||||
segment
|
||||
.save_checkpoint(
|
||||
&provider,
|
||||
result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
|
||||
)
|
||||
.unwrap();
|
||||
provider.commit().expect("commit");
|
||||
|
||||
let changesets = changesets
|
||||
.iter()
|
||||
.enumerate()
|
||||
.flat_map(|(block_number, changeset)| {
|
||||
changeset.iter().flat_map(move |(address, _, entries)| {
|
||||
entries.iter().map(move |entry| (block_number, address, entry))
|
||||
})
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
#[expect(clippy::skip_while_next)]
|
||||
let pruned = changesets
|
||||
.iter()
|
||||
.enumerate()
|
||||
.skip_while(|(i, (block_number, _, _))| {
|
||||
*i < deleted_entries_limit / STORAGE_HISTORY_TABLES_TO_PRUNE * run &&
|
||||
*block_number <= to_block as usize
|
||||
})
|
||||
.next()
|
||||
.map(|(i, _)| i)
|
||||
.unwrap_or_default();
|
||||
|
||||
// Skip what we've pruned so far, subtracting one to get last pruned block number
|
||||
// further down
|
||||
let mut pruned_changesets = changesets.iter().skip(pruned.saturating_sub(1));
|
||||
|
||||
let last_pruned_block_number = pruned_changesets
|
||||
.next()
|
||||
.map(|(block_number, _, _)| {
|
||||
(if result.progress.is_finished() {
|
||||
*block_number
|
||||
} else {
|
||||
block_number.saturating_sub(1)
|
||||
}) as BlockNumber
|
||||
})
|
||||
.unwrap_or(to_block);
|
||||
|
||||
let actual_shards = db.table::<tables::StoragesHistory>().unwrap();
|
||||
|
||||
let expected_shards = original_shards
|
||||
.iter()
|
||||
.filter(|(key, _)| key.sharded_key.highest_block_number > last_pruned_block_number)
|
||||
.map(|(key, blocks)| {
|
||||
let new_blocks =
|
||||
blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
|
||||
(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
assert_eq!(actual_shards, expected_shards);
|
||||
|
||||
assert_eq!(
|
||||
db.factory
|
||||
.provider()
|
||||
.unwrap()
|
||||
.get_prune_checkpoint(PruneSegment::StorageHistory)
|
||||
.unwrap(),
|
||||
Some(PruneCheckpoint {
|
||||
block_number: Some(last_pruned_block_number),
|
||||
tx_number: None,
|
||||
prune_mode
|
||||
})
|
||||
);
|
||||
};
|
||||
|
||||
test_prune(
|
||||
998,
|
||||
1,
|
||||
(PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 500),
|
||||
);
|
||||
test_prune(998, 2, (PruneProgress::Finished, 500));
|
||||
test_prune(1200, 3, (PruneProgress::Finished, 202));
|
||||
}
|
||||
|
||||
/// Tests that when a limiter stops mid-block (with multiple storage changes for the same
|
||||
/// block), the checkpoint is set to `block_number - 1` to avoid dangling index entries.
|
||||
#[test]
|
||||
@@ -666,6 +821,7 @@ mod tests {
|
||||
assert!(final_changesets.is_empty(), "All changesets up to block 10 should be pruned");
|
||||
}
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
#[test]
|
||||
fn prune_rocksdb() {
|
||||
use reth_db_api::models::storage_sharded_key::StorageShardedKey;
|
||||
|
||||
@@ -95,6 +95,7 @@ where
|
||||
.into_inner();
|
||||
|
||||
// Check where transaction hash numbers are stored
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if provider.cached_storage_settings().storage_v2 {
|
||||
return self.prune_rocksdb(provider, input, start, end);
|
||||
}
|
||||
@@ -195,6 +196,7 @@ impl TransactionLookup {
|
||||
///
|
||||
/// Reads transactions from static files and deletes corresponding entries
|
||||
/// from the `RocksDB` `TransactionHashNumbers` table.
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
fn prune_rocksdb<Provider>(
|
||||
&self,
|
||||
provider: &Provider,
|
||||
@@ -436,6 +438,7 @@ mod tests {
|
||||
test_prune(10, (PruneProgress::Finished, 8));
|
||||
}
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
#[test]
|
||||
fn prune_rocksdb() {
|
||||
use reth_db_api::models::StorageSettings;
|
||||
@@ -536,6 +539,7 @@ mod tests {
|
||||
/// 1. Some transactions have already been pruned (checkpoint at tx 5)
|
||||
/// 2. The deleted entries limit is exhausted before any new deletions
|
||||
/// 3. The checkpoint should NOT advance to the next start position
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
#[test]
|
||||
fn prune_rocksdb_zero_deleted_checkpoint() {
|
||||
use reth_db_api::models::StorageSettings;
|
||||
|
||||
@@ -491,10 +491,6 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA
|
||||
// Disabled because eth_createAccessList is sometimes used with non-eoa senders
|
||||
evm_env.cfg_env.disable_eip3607 = true;
|
||||
|
||||
// Disable additional fee charges (e.g. L2 operator fees),
|
||||
// consistent with prepare_call_env and estimate_gas_with.
|
||||
evm_env.cfg_env.disable_fee_charge = true;
|
||||
|
||||
// Disable EIP-7825 transaction gas limit cap so that the gas limit
|
||||
// fallback (block gas limit) is not rejected when it exceeds the
|
||||
// per-tx cap (2^24 ≈ 16.7M post-Osaka).
|
||||
|
||||
@@ -104,16 +104,11 @@ where
|
||||
fork_timestamps.sort_unstable();
|
||||
fork_timestamps.dedup();
|
||||
|
||||
let current_fork_idx = match fork_timestamps.iter().position(|ts| &latest.timestamp() < ts)
|
||||
{
|
||||
// All forks are in the past, use the last one.
|
||||
None => fork_timestamps.len().checked_sub(1),
|
||||
// First fork hasn't activated yet — no active timestamp fork.
|
||||
Some(0) => None,
|
||||
// Found a future fork; current is the one right before it.
|
||||
Some(idx) => Some(idx - 1),
|
||||
};
|
||||
let (current_fork_idx, current_fork_timestamp) = current_fork_idx
|
||||
let (current_fork_idx, current_fork_timestamp) = fork_timestamps
|
||||
.iter()
|
||||
.position(|ts| &latest.timestamp() < ts)
|
||||
.and_then(|idx| idx.checked_sub(1))
|
||||
.or_else(|| fork_timestamps.len().checked_sub(1))
|
||||
.and_then(|idx| fork_timestamps.get(idx).map(|ts| (idx, *ts)))
|
||||
.ok_or_else(|| RethError::msg("no active timestamp fork found"))?;
|
||||
|
||||
|
||||
@@ -42,7 +42,6 @@ pub trait EstimateCall: Call {
|
||||
///
|
||||
/// - `disable_eip3607` is set to `true`
|
||||
/// - `disable_base_fee` is set to `true`
|
||||
/// - `disable_fee_charge` is set to `true`
|
||||
/// - `nonce` is set to `None`
|
||||
fn estimate_gas_with<S>(
|
||||
&self,
|
||||
@@ -63,10 +62,6 @@ pub trait EstimateCall: Call {
|
||||
// <https://github.com/ethereum/go-ethereum/blob/ee8e83fa5f6cb261dad2ed0a7bbcde4930c41e6c/internal/ethapi/api.go#L985>
|
||||
evm_env.cfg_env.disable_base_fee = true;
|
||||
|
||||
// Disable additional fee charges (e.g. L2 operator fees) for gas estimation,
|
||||
// consistent with `prepare_call_env` for `eth_call`.
|
||||
evm_env.cfg_env.disable_fee_charge = true;
|
||||
|
||||
// set nonce to None so that the correct nonce is chosen by the EVM
|
||||
request.as_mut().take_nonce();
|
||||
|
||||
|
||||
@@ -295,7 +295,6 @@ where
|
||||
fn extract_reward_traces<H: BlockHeader>(
|
||||
&self,
|
||||
header: &H,
|
||||
block_hash: BlockHash,
|
||||
ommers: Option<&[H]>,
|
||||
base_block_reward: u128,
|
||||
) -> Vec<LocalizedTransactionTrace> {
|
||||
@@ -304,7 +303,6 @@ where
|
||||
|
||||
let block_reward = block_reward(base_block_reward, ommers_cnt);
|
||||
traces.push(reward_trace(
|
||||
block_hash,
|
||||
header,
|
||||
RewardAction {
|
||||
author: header.beneficiary(),
|
||||
@@ -318,7 +316,6 @@ where
|
||||
for uncle in ommers {
|
||||
let uncle_reward = ommer_reward(base_block_reward, header.number(), uncle.number());
|
||||
traces.push(reward_trace(
|
||||
block_hash,
|
||||
header,
|
||||
RewardAction {
|
||||
author: uncle.beneficiary(),
|
||||
@@ -431,7 +428,6 @@ where
|
||||
all_traces.extend(
|
||||
self.extract_reward_traces(
|
||||
block.header(),
|
||||
block.hash(),
|
||||
block.body().ommers(),
|
||||
base_block_reward,
|
||||
)
|
||||
@@ -506,7 +502,6 @@ where
|
||||
{
|
||||
traces.extend(self.extract_reward_traces(
|
||||
block.header(),
|
||||
block.hash(),
|
||||
block.body().ommers(),
|
||||
base_block_reward,
|
||||
));
|
||||
@@ -800,13 +795,9 @@ pub struct BlockStorageAccess {
|
||||
|
||||
/// Helper to construct a [`LocalizedTransactionTrace`] that describes a reward to the block
|
||||
/// beneficiary.
|
||||
fn reward_trace<H: BlockHeader>(
|
||||
block_hash: BlockHash,
|
||||
header: &H,
|
||||
reward: RewardAction,
|
||||
) -> LocalizedTransactionTrace {
|
||||
fn reward_trace<H: BlockHeader>(header: &H, reward: RewardAction) -> LocalizedTransactionTrace {
|
||||
LocalizedTransactionTrace {
|
||||
block_hash: Some(block_hash),
|
||||
block_hash: Some(header.hash_slow()),
|
||||
block_number: Some(header.number()),
|
||||
transaction_hash: None,
|
||||
transaction_position: None,
|
||||
|
||||
@@ -378,7 +378,7 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
|
||||
});
|
||||
}
|
||||
|
||||
// update finalized and safe block if needed
|
||||
// update finalized block if needed
|
||||
let last_saved_finalized_block_number =
|
||||
provider_rw.last_finalized_block_number()?;
|
||||
|
||||
@@ -392,16 +392,6 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
|
||||
))?;
|
||||
}
|
||||
|
||||
let last_saved_safe_block_number = provider_rw.last_safe_block_number()?;
|
||||
|
||||
if last_saved_safe_block_number.is_none() ||
|
||||
Some(checkpoint.block_number) < last_saved_safe_block_number
|
||||
{
|
||||
provider_rw.save_safe_block_number(BlockNumber::from(
|
||||
checkpoint.block_number,
|
||||
))?;
|
||||
}
|
||||
|
||||
provider_rw.commit()?;
|
||||
|
||||
stage.post_unwind_commit()?;
|
||||
|
||||
@@ -121,3 +121,5 @@ test-utils = [
|
||||
"reth-evm-ethereum/test-utils",
|
||||
"reth-tasks/test-utils",
|
||||
]
|
||||
rocksdb = ["reth-provider/rocksdb", "reth-db-common/rocksdb"]
|
||||
edge = ["rocksdb"]
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
use super::collect_account_history_indices;
|
||||
use crate::stages::utils::{collect_history_indices, load_account_history};
|
||||
use reth_config::config::{EtlConfig, IndexHistoryConfig};
|
||||
use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut, Tables};
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
use reth_db_api::Tables;
|
||||
use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut};
|
||||
use reth_provider::{
|
||||
DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
|
||||
RocksDBProviderFactory, StorageSettingsCache,
|
||||
@@ -142,6 +144,7 @@ where
|
||||
Ok(((), writer.into_raw_rocksdb_batch()))
|
||||
})?;
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if use_rocksdb {
|
||||
provider.commit_pending_rocksdb_batches()?;
|
||||
provider.rocksdb_provider().flush(&[Tables::AccountsHistory.name()])?;
|
||||
@@ -660,6 +663,7 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
mod rocksdb_tests {
|
||||
use super::*;
|
||||
use reth_provider::{
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
use super::{collect_history_indices, collect_storage_history_indices};
|
||||
use crate::{stages::utils::load_storage_history, StageCheckpoint, StageId};
|
||||
use reth_config::config::{EtlConfig, IndexHistoryConfig};
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
use reth_db_api::Tables;
|
||||
use reth_db_api::{
|
||||
models::{storage_sharded_key::StorageShardedKey, AddressStorageKey, BlockNumberAddress},
|
||||
tables,
|
||||
transaction::DbTxMut,
|
||||
Tables,
|
||||
};
|
||||
use reth_provider::{
|
||||
DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
|
||||
@@ -147,6 +148,7 @@ where
|
||||
Ok(((), writer.into_raw_rocksdb_batch()))
|
||||
})?;
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if use_rocksdb {
|
||||
provider.commit_pending_rocksdb_batches()?;
|
||||
provider.rocksdb_provider().flush(&[Tables::StoragesHistory.name()])?;
|
||||
@@ -689,6 +691,7 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
mod rocksdb_tests {
|
||||
use super::*;
|
||||
use reth_db_api::models::StorageBeforeTx;
|
||||
|
||||
@@ -2,11 +2,12 @@ use alloy_eips::eip2718::Encodable2718;
|
||||
use alloy_primitives::{TxHash, TxNumber};
|
||||
use num_traits::Zero;
|
||||
use reth_config::config::{EtlConfig, TransactionLookupConfig};
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
use reth_db_api::Tables;
|
||||
use reth_db_api::{
|
||||
table::{Decode, Decompress, Value},
|
||||
tables,
|
||||
transaction::DbTxMut,
|
||||
Tables,
|
||||
};
|
||||
use reth_etl::Collector;
|
||||
use reth_primitives_traits::{NodePrimitives, SignedTransaction};
|
||||
@@ -198,6 +199,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if provider.cached_storage_settings().storage_v2 {
|
||||
provider.commit_pending_rocksdb_batches()?;
|
||||
provider.rocksdb_provider().flush(&[Tables::TransactionHashNumbers.name()])?;
|
||||
@@ -599,6 +601,7 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
mod rocksdb_tests {
|
||||
use super::*;
|
||||
use reth_provider::RocksDBProviderFactory;
|
||||
|
||||
@@ -36,3 +36,6 @@ reth-testing-utils.workspace = true
|
||||
|
||||
assert_matches.workspace = true
|
||||
tempfile.workspace = true
|
||||
|
||||
[features]
|
||||
edge = ["reth-stages/edge"]
|
||||
|
||||
@@ -93,3 +93,5 @@ op = [
|
||||
"reth-codecs/op",
|
||||
"reth-primitives-traits/op",
|
||||
]
|
||||
rocksdb = []
|
||||
edge = ["rocksdb"]
|
||||
|
||||
@@ -28,8 +28,20 @@ pub struct StorageSettings {
|
||||
|
||||
impl StorageSettings {
|
||||
/// Returns the default base `StorageSettings`.
|
||||
///
|
||||
/// When the `edge` feature is enabled, returns [`Self::v2()`] so that CI and
|
||||
/// edge builds automatically use v2 storage defaults. Otherwise returns
|
||||
/// [`Self::v1()`]. The `--storage.v2` CLI flag can also opt into v2 at runtime
|
||||
/// regardless of feature flags.
|
||||
pub const fn base() -> Self {
|
||||
Self::v2()
|
||||
#[cfg(feature = "edge")]
|
||||
{
|
||||
Self::v2()
|
||||
}
|
||||
#[cfg(not(feature = "edge"))]
|
||||
{
|
||||
Self::v1()
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates `StorageSettings` for v2 nodes with all storage features enabled:
|
||||
|
||||
@@ -46,5 +46,9 @@ reth-db = { workspace = true, features = ["mdbx"] }
|
||||
reth-provider = { workspace = true, features = ["test-utils"] }
|
||||
reth-tasks.workspace = true
|
||||
|
||||
[features]
|
||||
rocksdb = ["reth-db-api/rocksdb", "reth-provider/rocksdb"]
|
||||
edge = ["rocksdb"]
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
@@ -633,7 +633,7 @@ where
|
||||
.ok_or_else(|| eyre::eyre!("Block hash not found for block {}", block))?;
|
||||
let header = provider_rw
|
||||
.header_by_number(block)?
|
||||
.map(|h| SealedHeader::new(h, hash))
|
||||
.map(SealedHeader::seal_slow)
|
||||
.ok_or_else(|| ProviderError::HeaderNotFound(block.into()))?;
|
||||
|
||||
let expected_state_root = header.state_root();
|
||||
@@ -1051,6 +1051,7 @@ mod tests {
|
||||
)
|
||||
};
|
||||
|
||||
#[cfg(feature = "rocksdb")]
|
||||
{
|
||||
let settings = factory.cached_storage_settings();
|
||||
let rocksdb = factory.rocksdb_provider();
|
||||
@@ -1078,6 +1079,13 @@ mod tests {
|
||||
assert_eq!(accounts, expected_accounts);
|
||||
assert_eq!(storages, expected_storages);
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "rocksdb"))]
|
||||
{
|
||||
let (accounts, storages) = collect_from_mdbx(&factory);
|
||||
assert_eq!(accounts, expected_accounts);
|
||||
assert_eq!(storages, expected_storages);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -63,7 +63,9 @@ tokio = { workspace = true, features = ["sync"], optional = true }
|
||||
# parallel utils
|
||||
rayon.workspace = true
|
||||
|
||||
rocksdb = { workspace = true, features = ["jemalloc"] }
|
||||
[target.'cfg(unix)'.dependencies]
|
||||
# rocksdb: jemalloc is recommended production workload
|
||||
rocksdb = { workspace = true, features = ["jemalloc"], optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
reth-db = { workspace = true, features = ["test-utils"] }
|
||||
@@ -85,6 +87,8 @@ rand.workspace = true
|
||||
tokio = { workspace = true, features = ["sync", "macros", "rt-multi-thread"] }
|
||||
|
||||
[features]
|
||||
rocksdb = ["reth-storage-api/rocksdb", "dep:rocksdb"]
|
||||
edge = ["rocksdb"]
|
||||
test-utils = [
|
||||
"reth-db/test-utils",
|
||||
"reth-nippy-jar/test-utils",
|
||||
|
||||
@@ -7,11 +7,10 @@ use std::{
|
||||
ops::{Range, RangeInclusive},
|
||||
};
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
use crate::providers::rocksdb::RocksDBBatch;
|
||||
use crate::{
|
||||
providers::{
|
||||
history_info, rocksdb::RocksDBBatch, HistoryInfo, StaticFileProvider,
|
||||
StaticFileProviderRWRefMut,
|
||||
},
|
||||
providers::{history_info, HistoryInfo, StaticFileProvider, StaticFileProviderRWRefMut},
|
||||
StaticFileProviderFactory,
|
||||
};
|
||||
use alloy_primitives::{map::HashMap, Address, BlockNumber, TxHash, TxNumber, B256};
|
||||
@@ -63,16 +62,40 @@ type EitherWriterTy<'a, P, T> = EitherWriter<
|
||||
>;
|
||||
|
||||
/// Helper type for `RocksDB` batch argument in writer constructors.
|
||||
///
|
||||
/// When `rocksdb` feature is enabled, this is a real `RocksDB` batch.
|
||||
/// Otherwise, it's `()` (unit type) to allow the same API without feature gates.
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
pub type RocksBatchArg<'a> = crate::providers::rocksdb::RocksDBBatch<'a>;
|
||||
/// Helper type for `RocksDB` batch argument in writer constructors.
|
||||
///
|
||||
/// When `rocksdb` feature is enabled, this is a real `RocksDB` batch.
|
||||
/// Otherwise, it's `()` (unit type) to allow the same API without feature gates.
|
||||
#[cfg(not(all(unix, feature = "rocksdb")))]
|
||||
pub type RocksBatchArg<'a> = ();
|
||||
|
||||
/// The raw `RocksDB` batch type returned by [`EitherWriter::into_raw_rocksdb_batch`].
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
pub type RawRocksDBBatch = rocksdb::WriteBatchWithTransaction<true>;
|
||||
/// The raw `RocksDB` batch type returned by [`EitherWriter::into_raw_rocksdb_batch`].
|
||||
#[cfg(not(all(unix, feature = "rocksdb")))]
|
||||
pub type RawRocksDBBatch = ();
|
||||
|
||||
/// Helper type for `RocksDB` transaction reference argument in reader constructors.
|
||||
///
|
||||
/// When `rocksdb` feature is enabled, this is an optional reference to a `RocksDB` transaction.
|
||||
/// The `Option` allows callers to skip transaction creation when `RocksDB` isn't needed
|
||||
/// (e.g., on legacy MDBX-only nodes).
|
||||
/// When `rocksdb` feature is disabled, it's `()` (unit type) to allow the same API without
|
||||
/// feature gates.
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
pub type RocksTxRefArg<'a> = Option<&'a crate::providers::rocksdb::RocksTx<'a>>;
|
||||
/// Helper type for `RocksDB` transaction reference argument in reader constructors.
|
||||
///
|
||||
/// When `rocksdb` feature is disabled, it's `()` (unit type) to allow the same API without
|
||||
/// feature gates.
|
||||
#[cfg(not(all(unix, feature = "rocksdb")))]
|
||||
pub type RocksTxRefArg<'a> = ();
|
||||
|
||||
/// Represents a destination for writing data, either to database, static files, or `RocksDB`.
|
||||
#[derive(Debug, Display)]
|
||||
@@ -82,6 +105,7 @@ pub enum EitherWriter<'a, CURSOR, N> {
|
||||
/// Write to static file
|
||||
StaticFile(StaticFileProviderRWRefMut<'a, N>),
|
||||
/// Write to `RocksDB` using a write-only batch (historical tables).
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
RocksDB(RocksDBBatch<'a>),
|
||||
}
|
||||
|
||||
@@ -230,6 +254,7 @@ impl<'a> EitherWriter<'a, (), ()> {
|
||||
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
|
||||
P::Tx: DbTxMut,
|
||||
{
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if provider.cached_storage_settings().storage_v2 {
|
||||
return Ok(EitherWriter::RocksDB(_rocksdb_batch));
|
||||
}
|
||||
@@ -246,6 +271,7 @@ impl<'a> EitherWriter<'a, (), ()> {
|
||||
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
|
||||
P::Tx: DbTxMut,
|
||||
{
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if provider.cached_storage_settings().storage_v2 {
|
||||
return Ok(EitherWriter::RocksDB(_rocksdb_batch));
|
||||
}
|
||||
@@ -264,6 +290,7 @@ impl<'a> EitherWriter<'a, (), ()> {
|
||||
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
|
||||
P::Tx: DbTxMut,
|
||||
{
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if provider.cached_storage_settings().storage_v2 {
|
||||
return Ok(EitherWriter::RocksDB(_rocksdb_batch));
|
||||
}
|
||||
@@ -280,6 +307,7 @@ impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
|
||||
///
|
||||
/// This is used to defer `RocksDB` commits to the provider level, ensuring all
|
||||
/// storage commits (MDBX, static files, `RocksDB`) happen atomically in a single place.
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
pub fn into_raw_rocksdb_batch(self) -> Option<rocksdb::WriteBatchWithTransaction<true>> {
|
||||
match self {
|
||||
Self::Database(_) | Self::StaticFile(_) => None,
|
||||
@@ -287,6 +315,16 @@ impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Extracts the raw `RocksDB` write batch from this writer, if it contains one.
|
||||
///
|
||||
/// Without the `rocksdb` feature, this always returns `None`.
|
||||
#[cfg(not(all(unix, feature = "rocksdb")))]
|
||||
pub fn into_raw_rocksdb_batch(self) -> Option<RawRocksDBBatch> {
|
||||
match self {
|
||||
Self::Database(_) | Self::StaticFile(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Increment the block number.
|
||||
///
|
||||
/// Relevant only for [`Self::StaticFile`]. It is a no-op for [`Self::Database`].
|
||||
@@ -294,6 +332,7 @@ impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
|
||||
match self {
|
||||
Self::Database(_) => Ok(()),
|
||||
Self::StaticFile(writer) => writer.increment_block(expected_block_number),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
|
||||
}
|
||||
}
|
||||
@@ -308,6 +347,7 @@ impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
|
||||
match self {
|
||||
Self::Database(_) => Ok(()),
|
||||
Self::StaticFile(writer) => writer.ensure_at_block(block_number),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
|
||||
}
|
||||
}
|
||||
@@ -323,6 +363,7 @@ where
|
||||
match self {
|
||||
Self::Database(cursor) => Ok(cursor.append(tx_num, receipt)?),
|
||||
Self::StaticFile(writer) => writer.append_receipt(tx_num, receipt),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
|
||||
}
|
||||
}
|
||||
@@ -337,6 +378,7 @@ where
|
||||
match self {
|
||||
Self::Database(cursor) => Ok(cursor.append(tx_num, sender)?),
|
||||
Self::StaticFile(writer) => writer.append_transaction_sender(tx_num, sender),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
|
||||
}
|
||||
}
|
||||
@@ -354,6 +396,7 @@ where
|
||||
Ok(())
|
||||
}
|
||||
Self::StaticFile(writer) => writer.append_transaction_senders(senders),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
|
||||
}
|
||||
}
|
||||
@@ -386,6 +429,7 @@ where
|
||||
|
||||
writer.prune_transaction_senders(to_delete, block)?;
|
||||
}
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
|
||||
}
|
||||
|
||||
@@ -417,6 +461,7 @@ where
|
||||
}
|
||||
}
|
||||
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(batch) => batch.put::<tables::TransactionHashNumbers>(hash, &tx_num),
|
||||
}
|
||||
}
|
||||
@@ -446,6 +491,7 @@ where
|
||||
Ok(())
|
||||
}
|
||||
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(batch) => {
|
||||
for (hash, tx_num) in entries {
|
||||
batch.put::<tables::TransactionHashNumbers>(hash, &tx_num)?;
|
||||
@@ -465,6 +511,7 @@ where
|
||||
Ok(())
|
||||
}
|
||||
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(batch) => batch.delete::<tables::TransactionHashNumbers>(hash),
|
||||
}
|
||||
}
|
||||
@@ -483,6 +530,7 @@ where
|
||||
match self {
|
||||
Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
|
||||
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
|
||||
}
|
||||
}
|
||||
@@ -497,6 +545,7 @@ where
|
||||
Ok(())
|
||||
}
|
||||
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(batch) => batch.delete::<tables::StoragesHistory>(key),
|
||||
}
|
||||
}
|
||||
@@ -510,6 +559,7 @@ where
|
||||
match self {
|
||||
Self::Database(cursor) => Ok(cursor.append(key, value)?),
|
||||
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
|
||||
}
|
||||
}
|
||||
@@ -523,6 +573,7 @@ where
|
||||
match self {
|
||||
Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
|
||||
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
|
||||
}
|
||||
}
|
||||
@@ -537,6 +588,7 @@ where
|
||||
match self {
|
||||
Self::Database(cursor) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
|
||||
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(batch) => batch.get::<tables::StoragesHistory>(key),
|
||||
}
|
||||
}
|
||||
@@ -555,6 +607,7 @@ where
|
||||
match self {
|
||||
Self::Database(cursor) => Ok(cursor.append(key, value)?),
|
||||
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(batch) => batch.put::<tables::AccountsHistory>(key, value),
|
||||
}
|
||||
}
|
||||
@@ -568,6 +621,7 @@ where
|
||||
match self {
|
||||
Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
|
||||
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(batch) => batch.put::<tables::AccountsHistory>(key, value),
|
||||
}
|
||||
}
|
||||
@@ -582,6 +636,7 @@ where
|
||||
Ok(cursor.seek_exact(ShardedKey::last(address))?.map(|(_, v)| v))
|
||||
}
|
||||
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(batch) => batch.get::<tables::AccountsHistory>(ShardedKey::last(address)),
|
||||
}
|
||||
}
|
||||
@@ -596,6 +651,7 @@ where
|
||||
Ok(())
|
||||
}
|
||||
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(batch) => batch.delete::<tables::AccountsHistory>(key),
|
||||
}
|
||||
}
|
||||
@@ -624,6 +680,7 @@ where
|
||||
Self::StaticFile(writer) => {
|
||||
writer.append_account_changeset(changeset, block_number)?;
|
||||
}
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
|
||||
}
|
||||
|
||||
@@ -658,6 +715,7 @@ where
|
||||
Self::StaticFile(writer) => {
|
||||
writer.append_storage_changeset(changeset, block_number)?;
|
||||
}
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
|
||||
}
|
||||
|
||||
@@ -673,6 +731,7 @@ pub enum EitherReader<'a, CURSOR, N> {
|
||||
/// Read from static file
|
||||
StaticFile(StaticFileProvider<N>, PhantomData<&'a ()>),
|
||||
/// Read from `RocksDB` transaction
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
RocksDB(&'a crate::providers::rocksdb::RocksTx<'a>),
|
||||
}
|
||||
|
||||
@@ -704,6 +763,7 @@ impl<'a> EitherReader<'a, (), ()> {
|
||||
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
|
||||
P::Tx: DbTx,
|
||||
{
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if provider.cached_storage_settings().storage_v2 {
|
||||
return Ok(EitherReader::RocksDB(
|
||||
_rocksdb_tx.expect("storages_history_in_rocksdb requires rocksdb tx"),
|
||||
@@ -725,6 +785,7 @@ impl<'a> EitherReader<'a, (), ()> {
|
||||
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
|
||||
P::Tx: DbTx,
|
||||
{
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if provider.cached_storage_settings().storage_v2 {
|
||||
return Ok(EitherReader::RocksDB(
|
||||
_rocksdb_tx.expect("transaction_hash_numbers_in_rocksdb requires rocksdb tx"),
|
||||
@@ -746,6 +807,7 @@ impl<'a> EitherReader<'a, (), ()> {
|
||||
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
|
||||
P::Tx: DbTx,
|
||||
{
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if provider.cached_storage_settings().storage_v2 {
|
||||
return Ok(EitherReader::RocksDB(
|
||||
_rocksdb_tx.expect("account_history_in_rocksdb requires rocksdb tx"),
|
||||
@@ -803,6 +865,7 @@ where
|
||||
Some(result.map(|sender| (tx_num, sender)))
|
||||
})
|
||||
.collect::<ProviderResult<HashMap<_, _>>>(),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
|
||||
}
|
||||
}
|
||||
@@ -820,6 +883,7 @@ where
|
||||
match self {
|
||||
Self::Database(cursor, _) => Ok(cursor.seek_exact(hash)?.map(|(_, v)| v)),
|
||||
Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(tx) => tx.get::<tables::TransactionHashNumbers>(hash),
|
||||
}
|
||||
}
|
||||
@@ -837,6 +901,7 @@ where
|
||||
match self {
|
||||
Self::Database(cursor, _) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
|
||||
Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(tx) => tx.get::<tables::StoragesHistory>(key),
|
||||
}
|
||||
}
|
||||
@@ -861,6 +926,7 @@ where
|
||||
)
|
||||
}
|
||||
Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(tx) => tx.storage_history_info(
|
||||
address,
|
||||
storage_key,
|
||||
@@ -883,6 +949,7 @@ where
|
||||
match self {
|
||||
Self::Database(cursor, _) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
|
||||
Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(tx) => tx.get::<tables::AccountsHistory>(key),
|
||||
}
|
||||
}
|
||||
@@ -906,6 +973,7 @@ where
|
||||
)
|
||||
}
|
||||
Self::StaticFile(_, _) => Err(ProviderError::UnsupportedProvider),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(tx) => {
|
||||
tx.account_history_info(address, block_number, lowest_available_block_number)
|
||||
}
|
||||
@@ -954,6 +1022,7 @@ where
|
||||
entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
|
||||
})
|
||||
.collect(),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
|
||||
}
|
||||
}
|
||||
@@ -1126,7 +1195,7 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[cfg(all(test, unix, feature = "rocksdb"))]
|
||||
mod rocksdb_tests {
|
||||
use super::*;
|
||||
use crate::{
|
||||
|
||||
@@ -182,10 +182,12 @@ impl<N: ProviderNodeTypes> RocksDBProviderFactory for BlockchainProvider<N> {
|
||||
self.database.rocksdb_provider()
|
||||
}
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
|
||||
unimplemented!("BlockchainProvider wraps ProviderFactory - use DatabaseProvider::set_pending_rocksdb_batch instead")
|
||||
}
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
|
||||
unimplemented!("BlockchainProvider wraps ProviderFactory - use DatabaseProvider::commit_pending_rocksdb_batches instead")
|
||||
}
|
||||
|
||||
@@ -1287,9 +1287,7 @@ impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
|
||||
) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
|
||||
Ok(match id {
|
||||
BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
|
||||
BlockId::Hash(hash) => self
|
||||
.header(hash.block_hash)?
|
||||
.map(|header| SealedHeader::new(header, hash.block_hash)),
|
||||
BlockId::Hash(hash) => self.header(hash.block_hash)?.map(SealedHeader::seal_slow),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -28,8 +28,8 @@ use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment};
|
||||
use reth_stages_types::{PipelineTarget, StageCheckpoint, StageId};
|
||||
use reth_static_file_types::StaticFileSegment;
|
||||
use reth_storage_api::{
|
||||
BlockBodyIndicesProvider, ChainStateBlockReader, ChainStateBlockWriter, NodePrimitivesProvider,
|
||||
StorageSettings, StorageSettingsCache, TryIntoHistoricalStateProvider,
|
||||
BlockBodyIndicesProvider, NodePrimitivesProvider, StorageSettings, StorageSettingsCache,
|
||||
TryIntoHistoricalStateProvider,
|
||||
};
|
||||
use reth_storage_errors::provider::ProviderResult;
|
||||
use reth_trie::HashedPostState;
|
||||
@@ -40,7 +40,7 @@ use std::{
|
||||
path::Path,
|
||||
sync::Arc,
|
||||
};
|
||||
use tracing::{info, instrument, trace};
|
||||
use tracing::{instrument, trace};
|
||||
|
||||
mod provider;
|
||||
pub use provider::{
|
||||
@@ -195,10 +195,12 @@ impl<N: NodeTypesWithDB> RocksDBProviderFactory for ProviderFactory<N> {
|
||||
self.rocksdb_provider.clone()
|
||||
}
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
|
||||
unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::set_pending_rocksdb_batch instead")
|
||||
}
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
|
||||
unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::commit_pending_rocksdb_batches instead")
|
||||
}
|
||||
@@ -365,59 +367,8 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
|
||||
},
|
||||
);
|
||||
|
||||
// Step 4: Heal finalized/safe block numbers that may be ahead of the
|
||||
// highest header on nodes coming from <=1.10.2.
|
||||
//
|
||||
// Unwinds already set it to the target block.
|
||||
if rocksdb_unwind.is_none() && static_file_unwind.is_none() {
|
||||
self.heal_chain_state_block_numbers(&provider_ro)?;
|
||||
}
|
||||
|
||||
Ok((rocksdb_unwind, static_file_unwind))
|
||||
}
|
||||
|
||||
/// If the stored finalized or safe block number is ahead of the highest
|
||||
/// header, resets it to the highest header.
|
||||
fn heal_chain_state_block_numbers(
|
||||
&self,
|
||||
provider_ro: &DatabaseProvider<<N::DB as Database>::TX, N>,
|
||||
) -> ProviderResult<()> {
|
||||
let highest_header = self.last_block_number()?;
|
||||
|
||||
let finalized = provider_ro.last_finalized_block_number()?;
|
||||
let safe = provider_ro.last_safe_block_number()?;
|
||||
|
||||
if finalized.is_none_or(|f| f <= highest_header) && safe.is_none_or(|s| s <= highest_header)
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let provider_rw = self.provider_rw()?;
|
||||
|
||||
if let Some(finalized) = finalized.filter(|&f| f > highest_header) {
|
||||
info!(
|
||||
target: "providers::db",
|
||||
finalized,
|
||||
highest_header,
|
||||
"Healing finalized block number",
|
||||
);
|
||||
provider_rw.save_finalized_block_number(highest_header)?;
|
||||
}
|
||||
|
||||
if let Some(safe) = safe.filter(|&s| s > highest_header) {
|
||||
info!(
|
||||
target: "providers::db",
|
||||
safe,
|
||||
highest_header,
|
||||
"Healing safe block number",
|
||||
);
|
||||
provider_rw.save_safe_block_number(highest_header)?;
|
||||
}
|
||||
|
||||
provider_rw.commit()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<N: NodeTypesWithDB> NodePrimitivesProvider for ProviderFactory<N> {
|
||||
|
||||
@@ -205,6 +205,7 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
|
||||
/// Path to the database directory.
|
||||
db_path: PathBuf,
|
||||
/// Pending `RocksDB` batches to be committed at provider commit time.
|
||||
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
|
||||
pending_rocksdb_batches: PendingRocksDBBatches,
|
||||
/// Commit order for database operations.
|
||||
commit_order: CommitOrder,
|
||||
@@ -322,10 +323,12 @@ impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
|
||||
self.rocksdb_provider.clone()
|
||||
}
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
|
||||
self.pending_rocksdb_batches.lock().push(batch);
|
||||
}
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
|
||||
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
|
||||
for batch in batches {
|
||||
@@ -452,11 +455,16 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
where
|
||||
F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
|
||||
{
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let rocksdb = self.rocksdb_provider();
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let rocksdb_batch = rocksdb.batch();
|
||||
#[cfg(not(all(unix, feature = "rocksdb")))]
|
||||
let rocksdb_batch = ();
|
||||
|
||||
let (result, raw_batch) = f(rocksdb_batch)?;
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if let Some(batch) = raw_batch {
|
||||
self.set_pending_rocksdb_batch(batch);
|
||||
}
|
||||
@@ -495,6 +503,7 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
}
|
||||
|
||||
/// Creates the context for `RocksDB` writes.
|
||||
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
|
||||
fn rocksdb_write_ctx(&self, first_block: BlockNumber) -> RocksDBWriteCtx {
|
||||
RocksDBWriteCtx {
|
||||
first_block_number: first_block,
|
||||
@@ -553,11 +562,15 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
// avoid capturing &self.tx in scope below.
|
||||
let sf_provider = &self.static_file_provider;
|
||||
let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?;
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let rocksdb_provider = self.rocksdb_provider.clone();
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let rocksdb_ctx = self.rocksdb_write_ctx(first_number);
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let rocksdb_enabled = rocksdb_ctx.storage_settings.storage_v2;
|
||||
|
||||
let mut sf_result = None;
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let mut rocksdb_result = None;
|
||||
|
||||
// Write to all backends in parallel.
|
||||
@@ -578,6 +591,7 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
});
|
||||
|
||||
// RocksDB writes
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if rocksdb_enabled {
|
||||
s.spawn(|_| {
|
||||
let _guard = span.enter();
|
||||
@@ -699,6 +713,7 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
// Collect results from spawned tasks
|
||||
timings.sf = sf_result.ok_or(StaticFileWriterError::ThreadPanic("static file"))??;
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if rocksdb_enabled {
|
||||
timings.rocksdb = rocksdb_result.ok_or_else(|| {
|
||||
ProviderError::Database(reth_db_api::DatabaseError::Other(
|
||||
@@ -3256,8 +3271,11 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvi
|
||||
last_indices.sort_unstable_by_key(|(a, _)| *a);
|
||||
|
||||
if self.cached_storage_settings().storage_v2 {
|
||||
let batch = self.rocksdb_provider.unwind_account_history_indices(&last_indices)?;
|
||||
self.pending_rocksdb_batches.lock().push(batch);
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
{
|
||||
let batch = self.rocksdb_provider.unwind_account_history_indices(&last_indices)?;
|
||||
self.pending_rocksdb_batches.lock().push(batch);
|
||||
}
|
||||
} else {
|
||||
// Unwind the account history index in MDBX.
|
||||
let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
|
||||
@@ -3313,9 +3331,12 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvi
|
||||
storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
|
||||
|
||||
if self.cached_storage_settings().storage_v2 {
|
||||
let batch =
|
||||
self.rocksdb_provider.unwind_storage_history_indices(&storage_changesets)?;
|
||||
self.pending_rocksdb_batches.lock().push(batch);
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
{
|
||||
let batch =
|
||||
self.rocksdb_provider.unwind_storage_history_indices(&storage_changesets)?;
|
||||
self.pending_rocksdb_batches.lock().push(batch);
|
||||
}
|
||||
} else {
|
||||
// Unwind the storage history index in MDBX.
|
||||
let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
|
||||
@@ -3672,6 +3693,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
|
||||
// append_*_history_shard which handles read-merge-write internally.
|
||||
let storage_settings = self.cached_storage_settings();
|
||||
if storage_settings.storage_v2 {
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
self.with_rocksdb_batch(|mut batch| {
|
||||
for (address, blocks) in account_transitions {
|
||||
batch.append_account_history_shard(address, blocks)?;
|
||||
@@ -3682,6 +3704,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
|
||||
self.insert_account_history_index(account_transitions)?;
|
||||
}
|
||||
if storage_settings.storage_v2 {
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
self.with_rocksdb_batch(|mut batch| {
|
||||
for ((address, key), blocks) in storage_transitions {
|
||||
batch.append_storage_history_shard(address, key, blocks)?;
|
||||
@@ -3818,9 +3841,12 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
|
||||
if self.static_file_provider.has_unwind_queued() || self.commit_order.is_unwind() {
|
||||
self.tx.commit()?;
|
||||
|
||||
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
|
||||
for batch in batches {
|
||||
self.rocksdb_provider.commit_batch(batch)?;
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
{
|
||||
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
|
||||
for batch in batches {
|
||||
self.rocksdb_provider.commit_batch(batch)?;
|
||||
}
|
||||
}
|
||||
|
||||
self.static_file_provider.commit()?;
|
||||
@@ -3832,12 +3858,15 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
|
||||
self.static_file_provider.finalize()?;
|
||||
timings.sf = start.elapsed();
|
||||
|
||||
let start = Instant::now();
|
||||
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
|
||||
for batch in batches {
|
||||
self.rocksdb_provider.commit_batch(batch)?;
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
{
|
||||
let start = Instant::now();
|
||||
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
|
||||
for batch in batches {
|
||||
self.rocksdb_provider.commit_batch(batch)?;
|
||||
}
|
||||
timings.rocksdb = start.elapsed();
|
||||
}
|
||||
timings.rocksdb = start.elapsed();
|
||||
|
||||
let start = Instant::now();
|
||||
self.tx.commit()?;
|
||||
@@ -5106,24 +5135,28 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
let rocksdb = factory.rocksdb_provider();
|
||||
for block_num in 1..=num_blocks {
|
||||
for acct_idx in 0..accounts_per_block {
|
||||
let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
|
||||
let shards = rocksdb.account_history_shards(address).unwrap();
|
||||
assert!(
|
||||
!shards.is_empty(),
|
||||
"v2: RocksDB AccountsHistory missing for block {block_num} acct {acct_idx}"
|
||||
);
|
||||
|
||||
for s in 1..=slots_per_account as u64 {
|
||||
let slot = U256::from(s + acct_idx as u64 * 100);
|
||||
let slot_key = B256::from(slot);
|
||||
let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
{
|
||||
let rocksdb = factory.rocksdb_provider();
|
||||
for block_num in 1..=num_blocks {
|
||||
for acct_idx in 0..accounts_per_block {
|
||||
let address =
|
||||
Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
|
||||
let shards = rocksdb.account_history_shards(address).unwrap();
|
||||
assert!(
|
||||
!shards.is_empty(),
|
||||
"v2: RocksDB StoragesHistory missing for block {block_num} acct {acct_idx} slot {s}"
|
||||
"v2: RocksDB AccountsHistory missing for block {block_num} acct {acct_idx}"
|
||||
);
|
||||
|
||||
for s in 1..=slots_per_account as u64 {
|
||||
let slot = U256::from(s + acct_idx as u64 * 100);
|
||||
let slot_key = B256::from(slot);
|
||||
let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
|
||||
assert!(
|
||||
!shards.is_empty(),
|
||||
"v2: RocksDB StoragesHistory missing for block {block_num} acct {acct_idx} slot {s}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -5334,6 +5367,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
fn test_unwind_storage_history_indices_v2() {
|
||||
let factory = create_test_provider_factory();
|
||||
factory.set_storage_settings_cache(StorageSettings::v2());
|
||||
|
||||
@@ -32,6 +32,10 @@ pub use blockchain_provider::BlockchainProvider;
|
||||
mod consistent;
|
||||
pub use consistent::ConsistentProvider;
|
||||
|
||||
// RocksDB currently only supported on Unix platforms
|
||||
// Windows support is planned for future releases
|
||||
#[cfg_attr(all(unix, feature = "rocksdb"), path = "rocksdb/mod.rs")]
|
||||
#[cfg_attr(not(all(unix, feature = "rocksdb")), path = "rocksdb_stub.rs")]
|
||||
pub(crate) mod rocksdb;
|
||||
|
||||
pub use rocksdb::{
|
||||
|
||||
239
crates/storage/provider/src/providers/rocksdb_stub.rs
Normal file
239
crates/storage/provider/src/providers/rocksdb_stub.rs
Normal file
@@ -0,0 +1,239 @@
|
||||
//! Stub implementation of `RocksDB` provider.
|
||||
//!
|
||||
//! This module provides placeholder types that allow the code to compile when `RocksDB` is not
|
||||
//! available (either on non-Unix platforms or when the `rocksdb` feature is not enabled).
|
||||
//! All method calls are cfg-guarded in the calling code, so only type definitions are needed here.
|
||||
|
||||
use alloy_primitives::BlockNumber;
|
||||
use metrics::Label;
|
||||
use parking_lot::Mutex;
|
||||
use reth_db_api::{database_metrics::DatabaseMetrics, models::StorageSettings};
|
||||
use reth_prune_types::PruneMode;
|
||||
use reth_storage_errors::{db::LogLevel, provider::ProviderResult};
|
||||
use std::{path::Path, sync::Arc};
|
||||
|
||||
/// Pending `RocksDB` batches type alias (stub - uses unit type).
|
||||
pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<()>>>;
|
||||
|
||||
/// Statistics for a single `RocksDB` table (column family) - stub.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RocksDBTableStats {
|
||||
/// Size of SST files on disk in bytes.
|
||||
pub sst_size_bytes: u64,
|
||||
/// Size of memtables in memory in bytes.
|
||||
pub memtable_size_bytes: u64,
|
||||
/// Name of the table/column family.
|
||||
pub name: String,
|
||||
/// Estimated number of keys in the table.
|
||||
pub estimated_num_keys: u64,
|
||||
/// Estimated size of live data in bytes (SST files + memtables).
|
||||
pub estimated_size_bytes: u64,
|
||||
/// Estimated bytes pending compaction (reclaimable space).
|
||||
pub pending_compaction_bytes: u64,
|
||||
}
|
||||
|
||||
/// Database-level statistics for `RocksDB` - stub.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RocksDBStats {
|
||||
/// Statistics for each table (column family).
|
||||
pub tables: Vec<RocksDBTableStats>,
|
||||
/// Total size of WAL (Write-Ahead Log) files in bytes.
|
||||
pub wal_size_bytes: u64,
|
||||
}
|
||||
|
||||
/// Context for `RocksDB` block writes (stub).
|
||||
#[derive(Debug, Clone)]
|
||||
#[allow(dead_code)]
|
||||
pub(crate) struct RocksDBWriteCtx {
|
||||
/// The first block number being written.
|
||||
pub first_block_number: BlockNumber,
|
||||
/// The prune mode for transaction lookup, if any.
|
||||
pub prune_tx_lookup: Option<PruneMode>,
|
||||
/// Storage settings determining what goes to `RocksDB`.
|
||||
pub storage_settings: StorageSettings,
|
||||
/// Pending batches (stub - unused).
|
||||
pub pending_batches: PendingRocksDBBatches,
|
||||
}
|
||||
|
||||
/// A stub `RocksDB` provider.
|
||||
///
|
||||
/// This type exists to allow code to compile when `RocksDB` is not available (either on non-Unix
|
||||
/// platforms or when the `rocksdb` feature is not enabled). All method calls on `RocksDBProvider`
|
||||
/// are cfg-guarded in the calling code, so this stub only provides type definitions.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RocksDBProvider;
|
||||
|
||||
impl RocksDBProvider {
|
||||
/// Creates a new stub `RocksDB` provider.
|
||||
pub fn new(_path: impl AsRef<Path>) -> ProviderResult<Self> {
|
||||
Ok(Self)
|
||||
}
|
||||
|
||||
/// Creates a new stub `RocksDB` provider builder.
|
||||
pub fn builder(path: impl AsRef<Path>) -> RocksDBBuilder {
|
||||
RocksDBBuilder::new(path)
|
||||
}
|
||||
|
||||
/// Returns `true` if a `RocksDB` database exists at the given path (stub implementation).
|
||||
///
|
||||
/// Always returns `false` since `RocksDB` is not available.
|
||||
pub fn exists(_path: impl AsRef<Path>) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
/// Check consistency of `RocksDB` tables (stub implementation).
|
||||
///
|
||||
/// Returns `None` since there is no `RocksDB` data to check when the feature is disabled.
|
||||
pub const fn check_consistency<Provider>(
|
||||
&self,
|
||||
_provider: &Provider,
|
||||
) -> ProviderResult<Option<BlockNumber>> {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Returns statistics for all column families in the database (stub implementation).
|
||||
///
|
||||
/// Returns an empty vector since there is no `RocksDB` when the feature is disabled.
|
||||
pub const fn table_stats(&self) -> Vec<RocksDBTableStats> {
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
/// Clears all entries from the specified table (stub implementation).
|
||||
///
|
||||
/// This is a no-op since there is no `RocksDB` when the feature is disabled.
|
||||
pub const fn clear<T>(&self) -> ProviderResult<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the total size of WAL (Write-Ahead Log) files in bytes (stub implementation).
|
||||
///
|
||||
/// Returns 0 since there is no `RocksDB` when the feature is disabled.
|
||||
pub const fn wal_size_bytes(&self) -> u64 {
|
||||
0
|
||||
}
|
||||
|
||||
/// Returns database-level statistics including per-table stats and WAL size (stub
|
||||
/// implementation).
|
||||
///
|
||||
/// Returns empty stats since there is no `RocksDB` when the feature is disabled.
|
||||
pub const fn db_stats(&self) -> RocksDBStats {
|
||||
RocksDBStats { tables: Vec::new(), wal_size_bytes: 0 }
|
||||
}
|
||||
|
||||
/// Flushes all pending writes to disk (stub implementation).
|
||||
///
|
||||
/// This is a no-op since there is no `RocksDB` when the feature is disabled.
|
||||
pub const fn flush(&self, _tables: &[&'static str]) -> ProviderResult<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Creates an iterator over all entries in the specified table (stub implementation).
|
||||
///
|
||||
/// Returns an empty iterator since there is no `RocksDB` when the feature is disabled.
|
||||
pub const fn iter<T: reth_db_api::table::Table>(&self) -> ProviderResult<RocksDBIter<T>> {
|
||||
Ok(RocksDBIter(std::marker::PhantomData))
|
||||
}
|
||||
}
|
||||
|
||||
impl DatabaseMetrics for RocksDBProvider {
|
||||
fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
|
||||
vec![]
|
||||
}
|
||||
}
|
||||
|
||||
/// A stub batch writer for `RocksDB`.
|
||||
#[derive(Debug)]
|
||||
pub struct RocksDBBatch;
|
||||
|
||||
/// A stub builder for `RocksDB`.
|
||||
#[derive(Debug)]
|
||||
pub struct RocksDBBuilder;
|
||||
|
||||
impl RocksDBBuilder {
|
||||
/// Creates a new stub builder.
|
||||
pub fn new<P: AsRef<Path>>(_path: P) -> Self {
|
||||
Self
|
||||
}
|
||||
|
||||
/// Adds a column family for a specific table type (stub implementation).
|
||||
pub const fn with_table<T>(self) -> Self {
|
||||
self
|
||||
}
|
||||
|
||||
/// Registers the default tables used by reth for `RocksDB` storage (stub implementation).
|
||||
pub const fn with_default_tables(self) -> Self {
|
||||
self
|
||||
}
|
||||
|
||||
/// Enables metrics (stub implementation).
|
||||
pub const fn with_metrics(self) -> Self {
|
||||
self
|
||||
}
|
||||
|
||||
/// Enables `RocksDB` internal statistics collection (stub implementation).
|
||||
pub const fn with_statistics(self) -> Self {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the log level from `DatabaseArgs` configuration (stub implementation).
|
||||
pub const fn with_database_log_level(self, _log_level: Option<LogLevel>) -> Self {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets a custom block cache size (stub implementation).
|
||||
pub const fn with_block_cache_size(self, _capacity_bytes: usize) -> Self {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets read-only mode (stub implementation).
|
||||
pub const fn with_read_only(self, _read_only: bool) -> Self {
|
||||
self
|
||||
}
|
||||
|
||||
/// Build the `RocksDB` provider (stub implementation).
|
||||
pub const fn build(self) -> ProviderResult<RocksDBProvider> {
|
||||
Ok(RocksDBProvider)
|
||||
}
|
||||
}
|
||||
|
||||
/// A stub transaction for `RocksDB`.
|
||||
#[derive(Debug)]
|
||||
pub struct RocksTx;
|
||||
|
||||
/// A stub raw iterator for `RocksDB`.
|
||||
#[derive(Debug)]
|
||||
pub struct RocksDBRawIter;
|
||||
|
||||
/// A stub typed iterator for `RocksDB`.
|
||||
#[derive(Debug)]
|
||||
pub struct RocksDBIter<T: reth_db_api::table::Table>(std::marker::PhantomData<T>);
|
||||
|
||||
impl<T: reth_db_api::table::Table> Iterator for RocksDBIter<T> {
|
||||
type Item = ProviderResult<(T::Key, T::Value)>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Outcome of pruning a history shard in `RocksDB` (stub).
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum PruneShardOutcome {
|
||||
/// Shard was deleted entirely.
|
||||
Deleted,
|
||||
/// Shard was updated with filtered block numbers.
|
||||
Updated,
|
||||
/// Shard was unchanged (no blocks <= `to_block`).
|
||||
Unchanged,
|
||||
}
|
||||
|
||||
/// Tracks pruning outcomes for batch operations (stub).
|
||||
#[derive(Debug, Default, Clone, Copy)]
|
||||
pub struct PrunedIndices {
|
||||
/// Number of shards completely deleted.
|
||||
pub deleted: usize,
|
||||
/// Number of shards that were updated (filtered but still have entries).
|
||||
pub updated: usize,
|
||||
/// Number of shards that were unchanged.
|
||||
pub unchanged: usize,
|
||||
}
|
||||
@@ -1170,6 +1170,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
fn history_provider_get_storage_hashed_state() {
|
||||
use crate::BlockWriter;
|
||||
use alloy_primitives::keccak256;
|
||||
|
||||
@@ -30,8 +30,10 @@ impl<C: Send + Sync, N: NodePrimitives> RocksDBProviderFactory for NoopProvider<
|
||||
RocksDBProvider::builder(PathBuf::default()).build().unwrap()
|
||||
}
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {}
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ pub trait RocksDBProviderFactory {
|
||||
///
|
||||
/// This allows deferring `RocksDB` commits to happen at the same time as MDBX and static file
|
||||
/// commits, ensuring atomicity across all storage backends.
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>);
|
||||
|
||||
/// Takes all pending `RocksDB` batches and commits them.
|
||||
@@ -23,6 +24,7 @@ pub trait RocksDBProviderFactory {
|
||||
/// This drains the pending batches from the lock and commits each one using the `RocksDB`
|
||||
/// provider. Can be called before flush to persist `RocksDB` writes independently of the
|
||||
/// full commit path.
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()>;
|
||||
|
||||
/// Executes a closure with a `RocksDB` transaction for reading.
|
||||
@@ -35,12 +37,17 @@ pub trait RocksDBProviderFactory {
|
||||
Self: StorageSettingsCache,
|
||||
F: FnOnce(RocksTxRefArg<'_>) -> ProviderResult<R>,
|
||||
{
|
||||
if self.cached_storage_settings().storage_v2 {
|
||||
let rocksdb = self.rocksdb_provider();
|
||||
let tx = rocksdb.tx();
|
||||
return f(Some(&tx));
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
{
|
||||
if self.cached_storage_settings().storage_v2 {
|
||||
let rocksdb = self.rocksdb_provider();
|
||||
let tx = rocksdb.tx();
|
||||
return f(Some(&tx));
|
||||
}
|
||||
f(None)
|
||||
}
|
||||
f(None)
|
||||
#[cfg(not(all(unix, feature = "rocksdb")))]
|
||||
f(())
|
||||
}
|
||||
|
||||
/// Executes a closure with a `RocksDB` batch, automatically registering it for commit.
|
||||
@@ -50,13 +57,21 @@ pub trait RocksDBProviderFactory {
|
||||
where
|
||||
F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
|
||||
{
|
||||
let rocksdb = self.rocksdb_provider();
|
||||
let batch = rocksdb.batch();
|
||||
let (result, raw_batch) = f(batch)?;
|
||||
if let Some(b) = raw_batch {
|
||||
self.set_pending_rocksdb_batch(b);
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
{
|
||||
let rocksdb = self.rocksdb_provider();
|
||||
let batch = rocksdb.batch();
|
||||
let (result, raw_batch) = f(batch)?;
|
||||
if let Some(b) = raw_batch {
|
||||
self.set_pending_rocksdb_batch(b);
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
#[cfg(not(all(unix, feature = "rocksdb")))]
|
||||
{
|
||||
let (result, _) = f(())?;
|
||||
Ok(result)
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Executes a closure with a `RocksDB` batch that auto-commits on threshold.
|
||||
@@ -68,17 +83,25 @@ pub trait RocksDBProviderFactory {
|
||||
where
|
||||
F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
|
||||
{
|
||||
let rocksdb = self.rocksdb_provider();
|
||||
let batch = rocksdb.batch_with_auto_commit();
|
||||
let (result, raw_batch) = f(batch)?;
|
||||
if let Some(b) = raw_batch {
|
||||
self.set_pending_rocksdb_batch(b);
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
{
|
||||
let rocksdb = self.rocksdb_provider();
|
||||
let batch = rocksdb.batch_with_auto_commit();
|
||||
let (result, raw_batch) = f(batch)?;
|
||||
if let Some(b) = raw_batch {
|
||||
self.set_pending_rocksdb_batch(b);
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
#[cfg(not(all(unix, feature = "rocksdb")))]
|
||||
{
|
||||
let (result, _) = f(())?;
|
||||
Ok(result)
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[cfg(all(test, unix, feature = "rocksdb"))]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use reth_db_api::models::StorageSettings;
|
||||
|
||||
@@ -36,6 +36,8 @@ serde_json = { workspace = true, optional = true }
|
||||
|
||||
[features]
|
||||
default = ["std"]
|
||||
rocksdb = ["reth-db-api/rocksdb"]
|
||||
edge = ["rocksdb"]
|
||||
std = [
|
||||
"reth-chainspec/std",
|
||||
"alloy-consensus/std",
|
||||
|
||||
@@ -740,8 +740,7 @@ where
|
||||
sender: Address,
|
||||
nonce: u64,
|
||||
) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
|
||||
let sender_id = self.pool.sender_id(&sender)?;
|
||||
let transaction_id = TransactionId::new(sender_id, nonce);
|
||||
let transaction_id = TransactionId::new(self.pool.get_sender_id(sender), nonce);
|
||||
|
||||
self.inner().get_pool_data().all().get(&transaction_id).map(|tx| tx.transaction.clone())
|
||||
}
|
||||
|
||||
@@ -215,24 +215,11 @@ where
|
||||
self.notify_on_transaction_updates(outcome.promoted, outcome.discarded);
|
||||
}
|
||||
|
||||
/// Returns the internal [`SenderId`] for this address, allocating a new mapping when the
|
||||
/// address is first observed.
|
||||
///
|
||||
/// This must only be used on paths that intentionally begin tracking a sender, such as
|
||||
/// transaction insertion. Read-only lookups should prefer [`Self::sender_id`] to avoid
|
||||
/// growing the sender-id map for unknown addresses.
|
||||
/// Returns the internal [`SenderId`] for this address
|
||||
pub fn get_sender_id(&self, addr: Address) -> SenderId {
|
||||
self.identifiers.write().sender_id_or_create(addr)
|
||||
}
|
||||
|
||||
/// Returns the internal [`SenderId`] for this address if it is already tracked.
|
||||
///
|
||||
/// Unlike [`Self::get_sender_id`], this never allocates a new sender mapping and is therefore
|
||||
/// suitable for read-only queries or best-effort cleanup on unknown addresses.
|
||||
pub fn sender_id(&self, addr: &Address) -> Option<SenderId> {
|
||||
self.identifiers.read().sender_id(addr)
|
||||
}
|
||||
|
||||
/// Returns the internal [`SenderId`]s for the given addresses.
|
||||
pub fn get_sender_ids(&self, addrs: impl IntoIterator<Item = Address>) -> Vec<SenderId> {
|
||||
self.identifiers.write().sender_ids_or_create(addrs)
|
||||
@@ -1091,7 +1078,7 @@ where
|
||||
&self,
|
||||
sender: Address,
|
||||
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
|
||||
let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
|
||||
let sender_id = self.get_sender_id(sender);
|
||||
let removed = self.pool.write().remove_transactions_by_sender(sender_id);
|
||||
|
||||
self.with_event_listener(|listener| listener.discarded_many(&removed));
|
||||
@@ -1136,7 +1123,7 @@ where
|
||||
&self,
|
||||
sender: Address,
|
||||
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
|
||||
let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
|
||||
let sender_id = self.get_sender_id(sender);
|
||||
self.get_pool_data().get_transactions_by_sender(sender_id)
|
||||
}
|
||||
|
||||
@@ -1146,7 +1133,7 @@ where
|
||||
sender: Address,
|
||||
nonce: u64,
|
||||
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
|
||||
let sender_id = self.sender_id(&sender)?;
|
||||
let sender_id = self.get_sender_id(sender);
|
||||
self.get_pool_data().get_pending_transaction_by_sender_and_nonce(sender_id, nonce)
|
||||
}
|
||||
|
||||
@@ -1155,7 +1142,7 @@ where
|
||||
&self,
|
||||
sender: Address,
|
||||
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
|
||||
let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
|
||||
let sender_id = self.get_sender_id(sender);
|
||||
self.get_pool_data().queued_txs_by_sender(sender_id)
|
||||
}
|
||||
|
||||
@@ -1172,7 +1159,7 @@ where
|
||||
&self,
|
||||
sender: Address,
|
||||
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
|
||||
let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
|
||||
let sender_id = self.get_sender_id(sender);
|
||||
self.get_pool_data().pending_txs_by_sender(sender_id)
|
||||
}
|
||||
|
||||
@@ -1181,7 +1168,7 @@ where
|
||||
&self,
|
||||
sender: Address,
|
||||
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
|
||||
let sender_id = self.sender_id(&sender)?;
|
||||
let sender_id = self.get_sender_id(sender);
|
||||
self.get_pool_data().get_highest_transaction_by_sender(sender_id)
|
||||
}
|
||||
|
||||
@@ -1191,7 +1178,7 @@ where
|
||||
sender: Address,
|
||||
on_chain_nonce: u64,
|
||||
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
|
||||
let sender_id = self.sender_id(&sender)?;
|
||||
let sender_id = self.get_sender_id(sender);
|
||||
self.get_pool_data().get_highest_consecutive_transaction_by_sender(
|
||||
sender_id.into_transaction_id(on_chain_nonce),
|
||||
)
|
||||
@@ -1760,20 +1747,4 @@ mod tests {
|
||||
let identifiers = test_pool.identifiers.read();
|
||||
assert_eq!(identifiers.sender_id(&auth), Some(SenderId::from(1)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sender_queries_do_not_allocate_ids_for_unknown_addresses() {
|
||||
let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
|
||||
let sender = Address::new([9; 20]);
|
||||
|
||||
assert_eq!(test_pool.sender_id(&sender), None);
|
||||
assert!(test_pool.get_transactions_by_sender(sender).is_empty());
|
||||
assert!(test_pool.get_pending_transaction_by_sender_and_nonce(sender, 0).is_none());
|
||||
assert!(test_pool.get_queued_transactions_by_sender(sender).is_empty());
|
||||
assert!(test_pool.get_pending_transactions_by_sender(sender).is_empty());
|
||||
assert!(test_pool.get_highest_transaction_by_sender(sender).is_none());
|
||||
assert!(test_pool.get_highest_consecutive_transaction_by_sender(sender, 0).is_none());
|
||||
assert!(test_pool.remove_transactions_by_sender(sender).is_empty());
|
||||
assert_eq!(test_pool.sender_id(&sender), None);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3104,35 +3104,6 @@ mod tests {
|
||||
assert_eq!(pool.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn insert_replace_underpriced_rounds_up_minimum_bump() {
|
||||
let on_chain_balance = U256::ZERO;
|
||||
let on_chain_nonce = 0;
|
||||
let mut f = MockTransactionFactory::default();
|
||||
let mut pool = AllTransactions { minimal_protocol_basefee: 0, ..Default::default() };
|
||||
let mut tx = MockTransaction::eip1559().inc_price().inc_limit();
|
||||
tx.set_priority_fee(1);
|
||||
tx.set_max_fee(1);
|
||||
|
||||
let first = f.validated(tx.clone());
|
||||
let _ = pool.insert_tx(first.clone(), on_chain_balance, on_chain_nonce).unwrap();
|
||||
|
||||
let mut replacement = f.validated(tx.rng_hash().inc_price());
|
||||
replacement.transaction.set_priority_fee(1);
|
||||
replacement.transaction.set_max_fee(2);
|
||||
let err =
|
||||
pool.insert_tx(replacement.clone(), on_chain_balance, on_chain_nonce).unwrap_err();
|
||||
assert!(matches!(err, InsertErr::Underpriced { .. }));
|
||||
assert!(pool.contains(first.hash()));
|
||||
assert_eq!(pool.len(), 1);
|
||||
|
||||
replacement.transaction.set_priority_fee(2);
|
||||
replacement.transaction.set_max_fee(2);
|
||||
let replaced = pool.insert_tx(replacement, on_chain_balance, on_chain_nonce).unwrap();
|
||||
assert!(replaced.replaced_tx.is_some());
|
||||
assert_eq!(pool.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn insert_conflicting_type_normal_to_blob() {
|
||||
let on_chain_balance = U256::from(10_000);
|
||||
|
||||
@@ -455,11 +455,9 @@ impl<T: PoolTransaction> ValidPoolTransaction<T> {
|
||||
//
|
||||
// The bump is different for EIP-4844 and other transactions. See `PriceBumpConfig`.
|
||||
let price_bump = price_bumps.price_bump(self.tx_type());
|
||||
let required_bumped_fee =
|
||||
|existing_fee: u128| existing_fee.saturating_mul(100 + price_bump).div_ceil(100);
|
||||
|
||||
// Check if the max fee per gas is underpriced.
|
||||
if maybe_replacement.max_fee_per_gas() < required_bumped_fee(self.max_fee_per_gas()) {
|
||||
if maybe_replacement.max_fee_per_gas() < self.max_fee_per_gas() * (100 + price_bump) / 100 {
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -472,7 +470,7 @@ impl<T: PoolTransaction> ValidPoolTransaction<T> {
|
||||
if existing_max_priority_fee_per_gas != 0 &&
|
||||
replacement_max_priority_fee_per_gas != 0 &&
|
||||
replacement_max_priority_fee_per_gas <
|
||||
required_bumped_fee(existing_max_priority_fee_per_gas)
|
||||
existing_max_priority_fee_per_gas * (100 + price_bump) / 100
|
||||
{
|
||||
return true
|
||||
}
|
||||
@@ -482,7 +480,8 @@ impl<T: PoolTransaction> ValidPoolTransaction<T> {
|
||||
// This enforces that blob txs can only be replaced by blob txs
|
||||
let replacement_max_blob_fee_per_gas =
|
||||
maybe_replacement.transaction.max_fee_per_blob_gas().unwrap_or_default();
|
||||
if replacement_max_blob_fee_per_gas < required_bumped_fee(existing_max_blob_fee_per_gas)
|
||||
if replacement_max_blob_fee_per_gas <
|
||||
existing_max_blob_fee_per_gas * (100 + price_bump) / 100
|
||||
{
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -1,13 +1,19 @@
|
||||
use super::{
|
||||
branch_child_idx::{BranchChildIdx, BranchChildIter},
|
||||
ArenaSparseNode, ArenaSparseNodeBranchChild, ArenaSparseNodeState, Index, NodeArena,
|
||||
ArenaSparseNode, ArenaSparseNodeBranchChild, ArenaSparseNodeState,
|
||||
};
|
||||
use alloc::vec::Vec;
|
||||
use reth_trie_common::Nibbles;
|
||||
use slotmap::{DefaultKey, SlotMap};
|
||||
use tracing::{instrument, trace};
|
||||
|
||||
const TRACE_TARGET: &str = "trie::arena::cursor";
|
||||
|
||||
/// Alias for the slotmap key type used as node references throughout the arena trie.
|
||||
type Index = DefaultKey;
|
||||
/// Alias for the slotmap used as the node arena throughout the arena trie.
|
||||
type NodeArena = SlotMap<Index, ArenaSparseNode>;
|
||||
|
||||
/// An entry on the cursor's traversal stack, tracking an ancestor node during trie walks.
|
||||
#[derive(Debug, Clone)]
|
||||
pub(super) struct ArenaCursorStackEntry {
|
||||
@@ -87,14 +93,17 @@ impl ArenaCursor {
|
||||
self.stack.len() - 1
|
||||
}
|
||||
|
||||
/// Replaces the root entry on the stack with a new one.
|
||||
///
|
||||
/// The stack must contain exactly the root (depth 0) or be empty (freshly constructed).
|
||||
/// Returns `true` if the stack is empty.
|
||||
pub(super) const fn is_empty(&self) -> bool {
|
||||
self.stack.is_empty()
|
||||
}
|
||||
|
||||
/// Clears the traversal stack and pushes the given root entry.
|
||||
#[instrument(level = "trace", target = TRACE_TARGET, skip(self, arena))]
|
||||
pub(super) fn reset(&mut self, arena: &NodeArena, idx: Index, path: Nibbles) {
|
||||
debug_assert!(
|
||||
self.stack.len() <= 1 && !self.needs_pop,
|
||||
"cursor must be drained before reset; stack has {} entries, needs_pop={}",
|
||||
self.stack.is_empty() && !self.needs_pop,
|
||||
"cursor must be fully drained before reset; stack has {} entries, needs_pop={}",
|
||||
self.stack.len(),
|
||||
self.needs_pop,
|
||||
);
|
||||
@@ -141,15 +150,18 @@ impl ArenaCursor {
|
||||
_ => false,
|
||||
});
|
||||
if child_is_dirty {
|
||||
*arena[parent.index].state_mut() = ArenaSparseNodeState::Dirty;
|
||||
let parent_state = arena[parent.index].state_mut();
|
||||
if !matches!(parent_state, ArenaSparseNodeState::Dirty) {
|
||||
*parent_state = ArenaSparseNodeState::Dirty;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
entry
|
||||
}
|
||||
|
||||
/// Drains the stack down to the root, propagating dirty state from each popped entry
|
||||
/// to its parent. The root entry remains on the stack (there is no parent to propagate to).
|
||||
/// Drains the stack, propagating dirty state from each entry to its parent,
|
||||
/// then removes the final (root) entry.
|
||||
#[instrument(level = "trace", target = TRACE_TARGET, skip_all)]
|
||||
pub(super) fn drain(&mut self, arena: &mut NodeArena) {
|
||||
trace!(target: TRACE_TARGET, "Draining stack");
|
||||
@@ -157,6 +169,7 @@ impl ArenaCursor {
|
||||
while self.stack.len() > 1 {
|
||||
self.pop(arena);
|
||||
}
|
||||
self.stack.clear();
|
||||
}
|
||||
|
||||
/// Returns the logical path of the branch at the top of the stack.
|
||||
|
||||
@@ -1623,25 +1623,29 @@ impl ArenaParallelSparseTrie {
|
||||
let child_nibble = head_path.last().expect("non-root leaf");
|
||||
let parent_branch = arena[parent_idx].branch_ref();
|
||||
|
||||
if parent_branch.state_mask.count_bits() == 2 &&
|
||||
parent_branch.sibling_child(child_nibble).is_blinded()
|
||||
{
|
||||
let sibling_nibble = parent_branch
|
||||
.state_mask
|
||||
.iter()
|
||||
.find(|&n| n != child_nibble)
|
||||
.expect("branch has two children");
|
||||
let mut sibling_path = cursor.parent_logical_branch_path(arena);
|
||||
sibling_path.push_unchecked(sibling_nibble);
|
||||
trace!(target: TRACE_TARGET, ?full_path, ?sibling_path, "Removal would collapse branch onto blinded sibling, requesting proof");
|
||||
return (
|
||||
RemoveLeafResult::NeedsProof {
|
||||
key,
|
||||
proof_key: Self::nibbles_to_padded_b256(&sibling_path),
|
||||
min_len: (sibling_path.len() as u8).min(64),
|
||||
},
|
||||
SubtrieCounterDeltas::default(),
|
||||
);
|
||||
if parent_branch.state_mask.count_bits() == 2 {
|
||||
let child_idx = BranchChildIdx::new(parent_branch.state_mask, child_nibble)
|
||||
.expect("leaf nibble not found in parent state_mask");
|
||||
// With exactly 2 bits set the dense array has indices 0 and 1.
|
||||
let sibling_dense_idx = 1 - child_idx.get();
|
||||
if parent_branch.children[sibling_dense_idx].is_blinded() {
|
||||
let sibling_nibble = parent_branch
|
||||
.state_mask
|
||||
.iter()
|
||||
.find(|&n| n != child_nibble)
|
||||
.expect("branch has two children");
|
||||
let mut sibling_path = cursor.parent_logical_branch_path(arena);
|
||||
sibling_path.push_unchecked(sibling_nibble);
|
||||
trace!(target: TRACE_TARGET, ?full_path, ?sibling_path, "Removal would collapse branch onto blinded sibling, requesting proof");
|
||||
return (
|
||||
RemoveLeafResult::NeedsProof {
|
||||
key,
|
||||
proof_key: Self::nibbles_to_padded_b256(&sibling_path),
|
||||
min_len: (sibling_path.len() as u8).min(64),
|
||||
},
|
||||
SubtrieCounterDeltas::default(),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1649,7 +1653,10 @@ impl ArenaParallelSparseTrie {
|
||||
let removed_was_dirty =
|
||||
matches!(arena[head_idx].state_ref(), Some(ArenaSparseNodeState::Dirty));
|
||||
|
||||
if cursor.depth() == 0 {
|
||||
// Pop the leaf entry, propagating dirty state to the parent.
|
||||
cursor.pop(arena);
|
||||
|
||||
if cursor.is_empty() {
|
||||
// The leaf is the root — replace with EmptyRoot and reset the cursor
|
||||
// so subsequent iterations can call seek normally.
|
||||
arena.remove(head_idx);
|
||||
@@ -1664,9 +1671,6 @@ impl ArenaParallelSparseTrie {
|
||||
);
|
||||
}
|
||||
|
||||
// Pop the leaf entry, propagating dirty state to the parent.
|
||||
cursor.pop(arena);
|
||||
|
||||
// The parent must be a branch. Remove the leaf from it.
|
||||
let parent_entry = cursor.head().expect("cursor is non-empty");
|
||||
let parent_idx = parent_entry.index;
|
||||
@@ -1734,7 +1738,10 @@ impl ArenaParallelSparseTrie {
|
||||
return None;
|
||||
}
|
||||
|
||||
if !parent_branch.sibling_child(child_nibble).is_blinded() {
|
||||
let child_idx = BranchChildIdx::new(parent_branch.state_mask, child_nibble)
|
||||
.expect("child nibble not in parent state_mask");
|
||||
let sibling_dense_idx = 1 - child_idx.get();
|
||||
if !parent_branch.children[sibling_dense_idx].is_blinded() {
|
||||
return None;
|
||||
}
|
||||
|
||||
@@ -2127,6 +2134,21 @@ impl ArenaParallelSparseTrie {
|
||||
|
||||
Self::merge_subtrie_updates(&mut self.buffers.updates, &mut subtrie.buffers.updates);
|
||||
}
|
||||
|
||||
/// Returns true if the current `update_leaves` batch should defer revealed subtries for
|
||||
/// parallel processing. Will always return false in nostd builds.
|
||||
const fn is_update_leaves_parallelism_enabled(&self, num_updates: usize) -> bool {
|
||||
#[cfg(not(feature = "std"))]
|
||||
{
|
||||
let _ = num_updates;
|
||||
false
|
||||
}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
{
|
||||
num_updates >= self.parallelism_thresholds.min_updates
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SparseTrie for ArenaParallelSparseTrie {
|
||||
@@ -2756,7 +2778,7 @@ impl SparseTrie for ArenaParallelSparseTrie {
|
||||
updates.drain().map(|(key, update)| (key, Nibbles::unpack(key), update)).collect();
|
||||
sorted.sort_unstable_by_key(|entry| entry.1);
|
||||
|
||||
let threshold = self.parallelism_thresholds.min_updates;
|
||||
let parallelize_subtries = self.is_update_leaves_parallelism_enabled(sorted.len());
|
||||
|
||||
let mut cursor = mem::take(&mut self.buffers.cursor);
|
||||
cursor.reset(&self.upper_arena, self.root, Nibbles::default());
|
||||
@@ -2817,8 +2839,10 @@ impl SparseTrie for ArenaParallelSparseTrie {
|
||||
|
||||
let num_subtrie_updates = update_idx - subtrie_start;
|
||||
|
||||
if num_subtrie_updates >= threshold {
|
||||
// Take subtrie for parallel update.
|
||||
if parallelize_subtries {
|
||||
// Take subtrie for deferred update. Once the batch-level threshold is
|
||||
// met, even small subtries are processed together so distributed work
|
||||
// does not get stranded on the serial path.
|
||||
trace!(target: TRACE_TARGET, ?subtrie_root_path, num_subtrie_updates, "Taking subtrie for parallel update");
|
||||
let ArenaSparseNode::Subtrie(subtrie) = mem::replace(
|
||||
&mut self.upper_arena[child_idx],
|
||||
@@ -3184,6 +3208,22 @@ mod tests {
|
||||
changeset
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn update_leaves_parallelism_threshold_is_batch_scoped() {
|
||||
let trie = ArenaParallelSparseTrie::default().with_parallelism_thresholds(
|
||||
ArenaParallelismThresholds {
|
||||
min_dirty_leaves: 64,
|
||||
min_revealed_nodes: 16,
|
||||
min_updates: 8,
|
||||
min_leaves_for_prune: 128,
|
||||
},
|
||||
);
|
||||
|
||||
assert!(!trie.is_update_leaves_parallelism_enabled(7));
|
||||
assert!(trie.is_update_leaves_parallelism_enabled(8));
|
||||
assert!(trie.is_update_leaves_parallelism_enabled(32));
|
||||
}
|
||||
|
||||
proptest! {
|
||||
#![proptest_config(ProptestConfig::with_cases(1000))]
|
||||
#[test]
|
||||
|
||||
@@ -1,13 +1,19 @@
|
||||
use super::{
|
||||
branch_child_idx::{BranchChildIdx, BranchChildIter},
|
||||
ArenaSparseSubtrie, Index, NodeArena,
|
||||
ArenaSparseSubtrie,
|
||||
};
|
||||
use alloc::{boxed::Box, vec::Vec};
|
||||
use alloy_primitives::{keccak256, B256};
|
||||
use alloy_trie::{BranchNodeCompact, TrieMask};
|
||||
use reth_trie_common::{BranchNodeMasks, Nibbles, ProofTrieNodeV2, RlpNode, TrieNodeV2};
|
||||
use slotmap::{DefaultKey, SlotMap};
|
||||
use smallvec::SmallVec;
|
||||
|
||||
/// Alias for the slotmap key type used as node references throughout the arena trie.
|
||||
type Index = DefaultKey;
|
||||
/// Alias for the slotmap used as the node arena throughout the arena trie.
|
||||
type NodeArena = SlotMap<Index, ArenaSparseNode>;
|
||||
|
||||
/// Tracks whether a node's RLP encoding is cached or needs recomputation.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(super) enum ArenaSparseNodeState {
|
||||
@@ -100,23 +106,6 @@ impl ArenaSparseNodeBranch {
|
||||
self.state = ArenaSparseNodeState::Dirty;
|
||||
}
|
||||
|
||||
/// Returns a reference to the sibling child in a branch with exactly 2 children.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics (debug) if the branch does not have exactly 2 children, or if `nibble` is not set.
|
||||
pub(super) fn sibling_child(&self, nibble: u8) -> &ArenaSparseNodeBranchChild {
|
||||
debug_assert_eq!(
|
||||
self.state_mask.count_bits(),
|
||||
2,
|
||||
"sibling_child requires exactly 2 children"
|
||||
);
|
||||
let child_idx =
|
||||
BranchChildIdx::new(self.state_mask, nibble).expect("nibble not found in state_mask");
|
||||
// With exactly 2 children the dense array has indices 0 and 1.
|
||||
&self.children[1 - child_idx.get()]
|
||||
}
|
||||
|
||||
/// Iterates over `(nibble, &ArenaSparseNodeBranchChild)` pairs in nibble order.
|
||||
pub(super) fn child_iter(
|
||||
&self,
|
||||
|
||||
@@ -50,7 +50,7 @@ group "default" {
|
||||
}
|
||||
|
||||
group "nightly" {
|
||||
targets = ["ethereum", "ethereum-profiling"]
|
||||
targets = ["ethereum", "ethereum-profiling", "ethereum-edge-profiling"]
|
||||
}
|
||||
|
||||
// Base target with shared configuration
|
||||
@@ -100,6 +100,17 @@ target "ethereum-profiling" {
|
||||
tags = ["${REGISTRY}/reth:nightly-profiling"]
|
||||
}
|
||||
|
||||
target "ethereum-edge-profiling" {
|
||||
inherits = ["_base_profiling"]
|
||||
args = {
|
||||
BINARY = "reth"
|
||||
MANIFEST_PATH = "bin/reth"
|
||||
BUILD_PROFILE = "profiling"
|
||||
FEATURES = "jemalloc-prof edge"
|
||||
}
|
||||
tags = ["${REGISTRY}/reth:nightly-edge-profiling"]
|
||||
}
|
||||
|
||||
// Hive test targets — single-platform, hivetests profile, tar output
|
||||
target "_base_hive" {
|
||||
inherits = ["_base"]
|
||||
@@ -113,7 +124,7 @@ variable "HIVE_OUTPUT_DIR" {
|
||||
default = "./artifacts"
|
||||
}
|
||||
|
||||
target "hive" {
|
||||
target "hive-stable" {
|
||||
inherits = ["_base_hive"]
|
||||
args = {
|
||||
BINARY = "reth"
|
||||
@@ -123,6 +134,17 @@ target "hive" {
|
||||
output = ["type=docker,dest=${HIVE_OUTPUT_DIR}/reth_image.tar"]
|
||||
}
|
||||
|
||||
target "hive-edge" {
|
||||
inherits = ["_base_hive"]
|
||||
args = {
|
||||
BINARY = "reth"
|
||||
MANIFEST_PATH = "bin/reth"
|
||||
FEATURES = "edge"
|
||||
}
|
||||
tags = ["reth:local"]
|
||||
output = ["type=docker,dest=${HIVE_OUTPUT_DIR}/reth_image.tar"]
|
||||
}
|
||||
|
||||
// Kurtosis test target
|
||||
target "kurtosis" {
|
||||
inherits = ["_base_hive"]
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -3,7 +3,7 @@ name: reth
|
||||
services:
|
||||
lighthouse:
|
||||
restart: unless-stopped
|
||||
image: sigp/lighthouse:v8.1.2
|
||||
image: sigp/lighthouse:v8.0.1
|
||||
depends_on:
|
||||
- reth
|
||||
ports:
|
||||
|
||||
Reference in New Issue
Block a user