Compare commits

..

46 Commits

Author SHA1 Message Date
yongkangc
7e80c3cac2 merrge 2025-12-15 05:05:50 +00:00
yongkangc
6e8d62617d fix(multiproof): sr mismatvch
- Updated the handling of state updates to ensure that removals are recorded before merging updates, preserving the integrity of intermediate deletions.
- Introduced a new method, `record_removals`, to track monotonic removals from EVM state updates, ensuring that once a key is marked as removed, it remains so for proof invalidation.
- Improved comments for clarity on the removal tracking process and its implications for state updates.
2025-12-15 02:52:33 +00:00
yongkangc
866b8ded5f refactor(multiproof): streamline state update message handling
- Consolidated the sending of MultiProofMessage::StateUpdate to improve readability.
- Updated comments for clarity regarding the batch source logic in tests, ensuring it reflects the expected behavior of the PreBlock source.
2025-12-10 09:41:23 +00:00
yongkangc
51985e249c refactor(multiproof): batch consecutive
- Simplified the batching logic for state updates by removing unnecessary checks and consolidating the handling of different source types.
- Updated tests to verify that state updates from various sources can be batched together correctly while respecting the target limits.
- Improved clarity and maintainability of the code by refining comments and restructuring the logic for merging updates.
2025-12-10 09:41:23 +00:00
yongkangc
5ac911b707 refactor(engine): extract multiproof batch context into structs
Extract &mut parameters from process_multiproof_message into:
- MultiproofBatchCtx: core processing state (pending_msg, timing, updates_finished)
- MultiproofBatchMetrics: counters for proofs processed/requested

This improves code organization and reduces function parameter count.
2025-12-10 09:41:23 +00:00
yongkangc
e9a5a11a9f fix(engine): rename outcome to num_chunks for clarity
Addresses reviewer nit: the variable returned from dispatch_with_chunking
represents number of chunks, so the name should reflect that.
2025-12-10 09:41:23 +00:00
yongkangc
7dd14651e4 revert comparison 2025-12-10 09:41:22 +00:00
yongkangc
51ef406b94 Add bench compare latency stats 2025-12-10 09:38:03 +00:00
Matthias Seitz
a6b9472d1c fix: use generic header (#20250) 2025-12-10 09:11:39 +00:00
forkfury
6636d2a2ad docs: fix timestamp validation comment (#20246) 2025-12-10 08:41:23 +00:00
YK
ab6854d159 docs(reth-bench): fix incorrect output flag in README (#20240) 2025-12-10 07:18:34 +00:00
Charlie-Mack
5a274fc939 feat: add example for launching a node with custom rpc middleware (#20159) 2025-12-10 07:15:46 +00:00
radik878
c9431b224b refactor(rpc): remove dead got_notif flag from RpcService batch handler (#20171) 2025-12-10 07:15:09 +00:00
emmmm
8cbfd91db0 docs: add missing bodies_history and merkle_changesets prune config fields (#20244) 2025-12-10 07:10:57 +00:00
Block Wizard
43f9942ba7 docs(txpool): fix PoolSize total field comment to include blob pool (#20241) 2025-12-10 07:05:42 +00:00
Léa Narzis
06adc3ee0c refactor(rpc): return error instead of clamping for get_filter_block_range (#20218) 2025-12-10 07:03:30 +00:00
dependabot[bot]
fbf6be4cf2 chore(deps): bump dawidd6/action-homebrew-bump-formula from 6 to 7 (#20205)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-12-10 07:01:28 +00:00
Forostovec
21d61d40d1 docs: document state and block overrides for trace_call (#20217) 2025-12-10 07:00:59 +00:00
YK
cf7d709358 perf(engine): batch multiproof messages (#20066)
Co-authored-by: 0xSooki <0xsooki@gmail.com>
2025-12-10 03:42:08 +00:00
Vitalyr
e9355caba5 feat(reth-bench-compare): add reth command to summary output (#20089) 2025-12-10 02:12:57 +00:00
Brian Picciano
fdd9d5bb40 docs(trie): correct TrieInput::extend_with_blocks docstring (#20225) 2025-12-10 02:03:42 +00:00
AJStonewee
9eeba7e6b3 feat(transaction-pool): add new_blob_pool_transactions_listener (#20216)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com>
2025-12-09 23:41:00 +00:00
forkfury
0085acc868 docs: remove incorrect total_difficulty mention from process_iter (#20234)
Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com>
2025-12-09 23:27:16 +00:00
Alexey Shekhirin
c697147f90 ci: use depot runners (#20222) 2025-12-09 23:03:44 +00:00
kurahin
7388d6636d docs(config): clarify PruneConfig::merge semantics (#20235) 2025-12-09 21:15:02 +00:00
SashaMalysehko
0b859c0735 fix(rpc): validate fee history reward percentiles (#20198) 2025-12-09 21:03:17 +00:00
yyhrnk
a8e0606fa7 fix(cli): reference correct --without-evm flag in init-state error (#20231) 2025-12-09 21:00:45 +00:00
Galoretka
969689d9b6 docs: add admin_peers and admin_clearTxpool sections (#20185) 2025-12-09 20:59:44 +00:00
Adrian
ad2081493a docs: add missing documentation for serde_bincode_compat::ExExNotification (#20236) 2025-12-09 20:59:05 +00:00
Brian Picciano
abfb6d3965 feat(cli): Allow walking a range of an MDBX table using db mdbx get (#20233) 2025-12-09 20:37:06 +00:00
Alexey Shekhirin
0f0eb7a531 feat(net): pool transactions import duration metric (#20228) 2025-12-09 13:57:01 +00:00
Alexey Shekhirin
4f1e486b4f feat(engine): execution wait, pre, post metrics (#20166) 2025-12-09 13:30:58 +00:00
Alexey Shekhirin
05307d088c perf(chain-state): executed_block_receipts_ref (#20227) 2025-12-09 13:08:15 +00:00
Arsenii Kulikov
245cca7ce2 perf: avoid collect in truncate_pool (#20221) 2025-12-09 11:08:21 +00:00
Arsenii Kulikov
28d6996fc4 feat: add helper method to eth validator (#20206) 2025-12-08 22:48:54 +00:00
Karl Yu
0eaffdf489 feat: add StorageSettings for StoragesHistory in RocksDB (#20154) 2025-12-08 22:22:36 +00:00
futreall
9c141cac4b fix(rpc): return error if toBlock exceeds current head (#20202) 2025-12-08 17:42:01 +00:00
Léa Narzis
fc6ab35c5c test(era): complete int tests with roundtrip mainnet era files (#20064) 2025-12-08 17:01:21 +00:00
joshieDo
f88bf4e427 fix: set merkle changesets distance minimum to 128 (#20200) 2025-12-08 16:10:11 +00:00
Matthias Seitz
3d330caf36 perf: avoid duplicate storage get call (#20180) 2025-12-08 16:02:22 +00:00
Matthias Seitz
5a43e77771 fix: trace filter range off by one (#20199) 2025-12-08 15:54:08 +00:00
forkfury
5b3c479ed5 feat(primitives-traits): add recover_transactions_ref to avoid cloning (#20187)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2025-12-08 14:51:07 +00:00
Matthias Seitz
dc06b47abe fix: make inserted blocks part of fcu canonical (#20164)
Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com>
2025-12-08 14:06:39 +00:00
Arsenii Kulikov
e9cd7cc003 feat: parallelize recovery (#20169)
Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2025-12-08 14:05:37 +00:00
Alexey Shekhirin
f633efc969 ci: run on ubuntu instead of reth runner (#20196) 2025-12-08 14:30:20 +01:00
github-actions[bot]
2f55b1c30f chore(deps): weekly cargo update (#20174)
Co-authored-by: github-merge-queue <118344674+github-merge-queue@users.noreply.github.com>
2025-12-07 11:15:14 +00:00
62 changed files with 2491 additions and 1058 deletions

7
.github/actionlint.yaml vendored Normal file
View File

@@ -0,0 +1,7 @@
self-hosted-runner:
labels:
- depot-ubuntu-latest
- depot-ubuntu-latest-2
- depot-ubuntu-latest-4
- depot-ubuntu-latest-8
- depot-ubuntu-latest-16

View File

@@ -15,7 +15,7 @@ env:
name: bench
jobs:
codspeed:
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest
steps:
- uses: actions/checkout@v6
with:

View File

@@ -12,7 +12,7 @@ on:
jobs:
build:
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest-16
timeout-minutes: 90
steps:
- name: Checkout

View File

@@ -17,7 +17,7 @@ env:
name: compact-codec
jobs:
compact-codec:
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest-16
strategy:
matrix:
bin:

View File

@@ -19,7 +19,7 @@ concurrency:
jobs:
test:
name: e2e-testsuite
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest-16
env:
RUST_BACKTRACE: 1
timeout-minutes: 90

View File

@@ -24,7 +24,7 @@ jobs:
prepare-hive:
if: github.repository == 'paradigmxyz/reth'
timeout-minutes: 45
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest
steps:
- uses: actions/checkout@v6
- name: Checkout hive tests
@@ -178,7 +178,7 @@ jobs:
- prepare-reth
- prepare-hive
name: run ${{ matrix.scenario.sim }}${{ matrix.scenario.limit && format(' - {0}', matrix.scenario.limit) }}
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest
permissions:
issues: write
steps:
@@ -245,7 +245,7 @@ jobs:
notify-on-error:
needs: test
if: failure()
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest
steps:
- name: Slack Webhook Action
uses: rtCamp/action-slack-notify@v2

View File

@@ -23,7 +23,7 @@ jobs:
test:
name: test / ${{ matrix.network }}
if: github.event_name != 'schedule'
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest-16
env:
RUST_BACKTRACE: 1
strategy:

View File

@@ -32,7 +32,7 @@ jobs:
strategy:
fail-fast: false
name: run kurtosis
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest
needs:
- prepare-reth
steps:
@@ -85,7 +85,7 @@ jobs:
notify-on-error:
needs: test
if: failure()
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest
steps:
- name: Slack Webhook Action
uses: rtCamp/action-slack-notify@v2

View File

@@ -30,7 +30,7 @@ jobs:
strategy:
fail-fast: false
name: run kurtosis
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest
needs:
- prepare-reth
steps:
@@ -58,7 +58,7 @@ jobs:
notify-on-error:
needs: test
if: failure()
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest
steps:
- name: Slack Webhook Action
uses: rtCamp/action-slack-notify@v2

View File

@@ -12,7 +12,7 @@ env:
jobs:
clippy-binaries:
name: clippy binaries / ${{ matrix.type }}
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest
timeout-minutes: 30
strategy:
matrix:
@@ -40,7 +40,7 @@ jobs:
clippy:
name: clippy
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest-16
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
@@ -56,7 +56,7 @@ jobs:
RUSTFLAGS: -D warnings
wasm:
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
@@ -75,7 +75,7 @@ jobs:
.github/assets/check_wasm.sh
riscv:
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest
timeout-minutes: 60
steps:
- uses: actions/checkout@v6
@@ -93,7 +93,7 @@ jobs:
crate-checks:
name: crate-checks (${{ matrix.partition }}/${{ matrix.total_partitions }})
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest-16
strategy:
matrix:
partition: [1, 2]
@@ -111,7 +111,7 @@ jobs:
msrv:
name: MSRV
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest
timeout-minutes: 30
strategy:
matrix:
@@ -133,7 +133,7 @@ jobs:
docs:
name: docs
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest-16
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
@@ -150,7 +150,7 @@ jobs:
fmt:
name: fmt
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
@@ -163,7 +163,7 @@ jobs:
udeps:
name: udeps
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest-16
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
@@ -177,7 +177,7 @@ jobs:
book:
name: book
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest-16
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
@@ -232,10 +232,14 @@ jobs:
- name: Ensure no arbitrary or proptest dependency on default build
run: cargo tree --package reth -e=features,no-dev | grep -Eq "arbitrary|proptest" && exit 1 || exit 0
# Checks that selected rates can compile with power set of features
# Checks that selected crates can compile with power set of features
features:
name: features
runs-on: ubuntu-latest
name: features (${{ matrix.partition }}/${{ matrix.total_partitions }})
runs-on: depot-ubuntu-latest-16
strategy:
matrix:
partition: [1, 2]
total_partitions: [2]
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
@@ -246,13 +250,19 @@ jobs:
cache-on-failure: true
- name: cargo install cargo-hack
uses: taiki-e/install-action@cargo-hack
- run: make check-features
- run: |
cargo hack check \
--package reth-codecs \
--package reth-primitives-traits \
--package reth-primitives \
--feature-powerset \
--partition ${{ matrix.partition }}/${{ matrix.total_partitions }}
env:
RUSTFLAGS: -D warnings
# Check crates correctly propagate features
feature-propagation:
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest
timeout-minutes: 20
steps:
- uses: actions/checkout@v6

View File

@@ -26,7 +26,7 @@ jobs:
prepare-reth:
if: github.repository == 'paradigmxyz/reth'
timeout-minutes: 45
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest
steps:
- uses: actions/checkout@v6
- run: mkdir artifacts

View File

@@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Update Homebrew formula
uses: dawidd6/action-homebrew-bump-formula@v6
uses: dawidd6/action-homebrew-bump-formula@v7
with:
token: ${{ secrets.HOMEBREW }}
no_fork: true

View File

@@ -22,7 +22,7 @@ jobs:
name: stage-run-test
# Only run stage commands test in merge groups
if: github.event_name == 'merge_group'
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest
env:
RUST_LOG: info,sync=error
RUST_BACKTRACE: 1

View File

@@ -17,7 +17,7 @@ concurrency:
jobs:
sync:
name: sync (${{ matrix.chain.bin }})
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest
env:
RUST_LOG: info,sync=error
RUST_BACKTRACE: 1

View File

@@ -17,7 +17,7 @@ concurrency:
jobs:
sync:
name: sync (${{ matrix.chain.bin }})
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest
env:
RUST_LOG: info,sync=error
RUST_BACKTRACE: 1

View File

@@ -19,7 +19,7 @@ concurrency:
jobs:
test:
name: test / ${{ matrix.type }} (${{ matrix.partition }}/${{ matrix.total_partitions }})
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest-16
env:
RUST_BACKTRACE: 1
strategy:
@@ -64,7 +64,7 @@ jobs:
state:
name: Ethereum state tests
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest-4
env:
RUST_LOG: info,sync=error
RUST_BACKTRACE: 1
@@ -98,7 +98,7 @@ jobs:
doc:
name: doc tests
runs-on: ubuntu-latest
runs-on: depot-ubuntu-latest-16
env:
RUST_BACKTRACE: 1
timeout-minutes: 30

View File

@@ -11,7 +11,7 @@ on:
jobs:
check-reth:
runs-on: ubuntu-24.04
runs-on: depot-ubuntu-latest-16
timeout-minutes: 60
steps:
@@ -30,7 +30,7 @@ jobs:
run: cargo check --target x86_64-pc-windows-gnu
check-op-reth:
runs-on: ubuntu-24.04
runs-on: depot-ubuntu-latest-16
timeout-minutes: 60
steps:

11
Cargo.lock generated
View File

@@ -3671,6 +3671,17 @@ dependencies = [
"tracing",
]
[[package]]
name = "example-custom-rpc-middleware"
version = "0.0.0"
dependencies = [
"clap",
"jsonrpsee",
"reth-ethereum",
"tower",
"tracing",
]
[[package]]
name = "example-db-access"
version = "0.0.0"

View File

@@ -153,6 +153,7 @@ members = [
"examples/custom-node-components/",
"examples/custom-payload-builder/",
"examples/custom-rlpx-subprotocol",
"examples/custom-rpc-middleware",
"examples/custom-node",
"examples/db-access",
"examples/engine-api-access",

View File

@@ -523,8 +523,3 @@ pr:
make test
check-features:
cargo hack check \
--package reth-codecs \
--package reth-primitives-traits \
--package reth-primitives \
--feature-powerset

View File

@@ -506,8 +506,8 @@ async fn run_warmup_phase(
// Build additional args with conditional --debug.startup-sync-state-idle flag
let additional_args = args.build_additional_args("warmup", args.baseline_args.as_ref());
// Start reth node for warmup
let mut node_process =
// Start reth node for warmup (command is not stored for warmup phase)
let (mut node_process, _warmup_command) =
node_manager.start_node(&binary_path, warmup_ref, "warmup", &additional_args).await?;
// Wait for node to be ready and get its current tip
@@ -607,8 +607,8 @@ async fn run_benchmark_workflow(
// Build additional args with conditional --debug.startup-sync-state-idle flag
let additional_args = args.build_additional_args(ref_type, base_args_str);
// Start reth node
let mut node_process =
// Start reth node and capture the command for reporting
let (mut node_process, reth_command) =
node_manager.start_node(&binary_path, git_ref, ref_type, &additional_args).await?;
// Wait for node to be ready and get its current tip (wherever it is)
@@ -645,8 +645,9 @@ async fn run_benchmark_workflow(
// Store results for comparison
comparison_generator.add_ref_results(ref_type, &output_dir)?;
// Set the benchmark run timestamps
// Set the benchmark run timestamps and reth command
comparison_generator.set_ref_timestamps(ref_type, benchmark_start, benchmark_end)?;
comparison_generator.set_ref_command(ref_type, reth_command)?;
info!("Completed {} reference benchmark", ref_type);
}

View File

@@ -21,6 +21,8 @@ pub(crate) struct ComparisonGenerator {
feature_ref_name: String,
baseline_results: Option<BenchmarkResults>,
feature_results: Option<BenchmarkResults>,
baseline_command: Option<String>,
feature_command: Option<String>,
}
/// Represents the results from a single benchmark run
@@ -57,7 +59,6 @@ pub(crate) struct TotalGasRow {
/// - `mean_new_payload_latency_ms`: arithmetic mean latency across blocks.
/// - `median_new_payload_latency_ms`: p50 latency across blocks.
/// - `p90_new_payload_latency_ms` / `p99_new_payload_latency_ms`: tail latencies across blocks.
/// - `std_dev_new_payload_latency_ms`: standard deviation of latency across blocks.
#[derive(Debug, Clone, Serialize)]
pub(crate) struct BenchmarkSummary {
pub total_blocks: u64,
@@ -67,7 +68,6 @@ pub(crate) struct BenchmarkSummary {
pub median_new_payload_latency_ms: f64,
pub p90_new_payload_latency_ms: f64,
pub p99_new_payload_latency_ms: f64,
pub std_dev_new_payload_latency_ms: f64,
pub gas_per_second: f64,
pub blocks_per_second: f64,
pub min_block_number: u64,
@@ -91,6 +91,7 @@ pub(crate) struct RefInfo {
pub summary: BenchmarkSummary,
pub start_timestamp: Option<DateTime<Utc>>,
pub end_timestamp: Option<DateTime<Utc>>,
pub reth_command: Option<String>,
}
/// Summary of the comparison between references.
@@ -98,7 +99,6 @@ pub(crate) struct RefInfo {
/// Percent deltas are `(feature - baseline) / baseline * 100`:
/// - `new_payload_latency_p50_change_percent` / p90 / p99: percent changes of the respective
/// per-block percentiles.
/// - `std_dev_change_percent`: percent change in standard deviation of newPayload latency.
/// - `per_block_latency_change_mean_percent` / `per_block_latency_change_median_percent` are the
/// mean and median of per-block percent deltas (feature vs baseline), capturing block-level
/// drift.
@@ -117,7 +117,6 @@ pub(crate) struct ComparisonSummary {
pub new_payload_latency_p50_change_percent: f64,
pub new_payload_latency_p90_change_percent: f64,
pub new_payload_latency_p99_change_percent: f64,
pub std_dev_change_percent: f64,
pub gas_per_second_change_percent: f64,
pub blocks_per_second_change_percent: f64,
}
@@ -146,6 +145,8 @@ impl ComparisonGenerator {
feature_ref_name: args.feature_ref.clone(),
baseline_results: None,
feature_results: None,
baseline_command: None,
feature_command: None,
}
}
@@ -210,6 +211,21 @@ impl ComparisonGenerator {
Ok(())
}
/// Set the reth command for a reference
pub(crate) fn set_ref_command(&mut self, ref_type: &str, command: String) -> Result<()> {
match ref_type {
"baseline" => {
self.baseline_command = Some(command);
}
"feature" => {
self.feature_command = Some(command);
}
_ => return Err(eyre!("Unknown reference type: {}", ref_type)),
}
Ok(())
}
/// Generate the final comparison report
pub(crate) async fn generate_comparison_report(&self) -> Result<()> {
info!("Generating comparison report...");
@@ -234,12 +250,14 @@ impl ComparisonGenerator {
summary: baseline.summary.clone(),
start_timestamp: baseline.start_timestamp,
end_timestamp: baseline.end_timestamp,
reth_command: self.baseline_command.clone(),
},
feature: RefInfo {
ref_name: feature.ref_name.clone(),
summary: feature.summary.clone(),
start_timestamp: feature.start_timestamp,
end_timestamp: feature.end_timestamp,
reth_command: self.feature_command.clone(),
},
comparison_summary,
per_block_comparisons,
@@ -339,9 +357,6 @@ impl ComparisonGenerator {
let mean_new_payload_latency_ms: f64 =
latencies_ms.iter().sum::<f64>() / total_blocks as f64;
let std_dev_new_payload_latency_ms =
calculate_std_dev(&latencies_ms, mean_new_payload_latency_ms);
let mut sorted_latencies_ms = latencies_ms;
sorted_latencies_ms.sort_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal));
let median_new_payload_latency_ms = percentile(&sorted_latencies_ms, 0.5);
@@ -372,7 +387,6 @@ impl ComparisonGenerator {
median_new_payload_latency_ms,
p90_new_payload_latency_ms,
p99_new_payload_latency_ms,
std_dev_new_payload_latency_ms,
gas_per_second,
blocks_per_second,
min_block_number,
@@ -440,10 +454,6 @@ impl ComparisonGenerator {
baseline.p99_new_payload_latency_ms,
feature.p99_new_payload_latency_ms,
),
std_dev_change_percent: calc_percent_change(
baseline.std_dev_new_payload_latency_ms,
feature.std_dev_new_payload_latency_ms,
),
gas_per_second_change_percent: calc_percent_change(
baseline.gas_per_second,
feature.gas_per_second,
@@ -574,7 +584,6 @@ impl ComparisonGenerator {
" NewPayload Latency p99: {:+.2}%",
summary.new_payload_latency_p99_change_percent
);
println!(" NewPayload Latency std dev: {:+.2}%", summary.std_dev_change_percent);
println!(
" Gas/Second: {:+.2}%",
summary.gas_per_second_change_percent
@@ -597,12 +606,11 @@ impl ComparisonGenerator {
);
println!(" NewPayload latency (ms):");
println!(
" mean: {:.2}, p50: {:.2}, p90: {:.2}, p99: {:.2}, std dev: {:.2}",
" mean: {:.2}, p50: {:.2}, p90: {:.2}, p99: {:.2}",
baseline.mean_new_payload_latency_ms,
baseline.median_new_payload_latency_ms,
baseline.p90_new_payload_latency_ms,
baseline.p99_new_payload_latency_ms,
baseline.std_dev_new_payload_latency_ms
baseline.p99_new_payload_latency_ms
);
if let (Some(start), Some(end)) =
(&report.baseline.start_timestamp, &report.baseline.end_timestamp)
@@ -613,6 +621,9 @@ impl ComparisonGenerator {
end.format("%Y-%m-%d %H:%M:%S UTC")
);
}
if let Some(ref cmd) = report.baseline.reth_command {
println!(" Command: {}", cmd);
}
println!();
println!("Feature Summary:");
@@ -627,12 +638,11 @@ impl ComparisonGenerator {
);
println!(" NewPayload latency (ms):");
println!(
" mean: {:.2}, p50: {:.2}, p90: {:.2}, p99: {:.2}, std dev: {:.2}",
" mean: {:.2}, p50: {:.2}, p90: {:.2}, p99: {:.2}",
feature.mean_new_payload_latency_ms,
feature.median_new_payload_latency_ms,
feature.p90_new_payload_latency_ms,
feature.p99_new_payload_latency_ms,
feature.std_dev_new_payload_latency_ms
feature.p99_new_payload_latency_ms
);
if let (Some(start), Some(end)) =
(&report.feature.start_timestamp, &report.feature.end_timestamp)
@@ -643,6 +653,9 @@ impl ComparisonGenerator {
end.format("%Y-%m-%d %H:%M:%S UTC")
);
}
if let Some(ref cmd) = report.feature.reth_command {
println!(" Command: {}", cmd);
}
println!();
}
}

View File

@@ -240,19 +240,24 @@ impl NodeManager {
}
/// Start a reth node using the specified binary path and return the process handle
/// along with the formatted reth command string for reporting.
pub(crate) async fn start_node(
&mut self,
binary_path: &std::path::Path,
_git_ref: &str,
ref_type: &str,
additional_args: &[String],
) -> Result<tokio::process::Child> {
) -> Result<(tokio::process::Child, String)> {
// Store the binary path for later use (e.g., in unwind_to_block)
self.binary_path = Some(binary_path.to_path_buf());
let binary_path_str = binary_path.to_string_lossy();
let (reth_args, _) = self.build_reth_args(&binary_path_str, additional_args, ref_type);
// Format the reth command string for reporting
let reth_command = shlex::try_join(reth_args.iter().map(|s| s.as_str()))
.wrap_err("Failed to format reth command string")?;
// Log additional arguments if any
if !self.additional_reth_args.is_empty() {
info!("Using common additional reth arguments: {:?}", self.additional_reth_args);
@@ -346,7 +351,7 @@ impl NodeManager {
// Give the node a moment to start up
sleep(Duration::from_secs(5)).await;
Ok(child)
Ok((child, reth_command))
}
/// Wait for the node to be ready and return its current tip

View File

@@ -143,5 +143,5 @@ To reproduce the benchmark, first re-set the node to the block that the benchmar
- **RPC Configuration**: The RPC endpoints should be accessible and configured correctly, specifically the RPC endpoint must support `eth_getBlockByNumber` and support fetching full transactions. The benchmark will make one RPC query per block as fast as possible, so ensure the RPC endpoint does not rate limit or block requests after a certain volume.
- **Reproducibility**: Ensure that the node is at the same state before attempting to retry a benchmark. The `new-payload-fcu` command specifically will commit to the database, so the node must be rolled back using `reth stage unwind` to reproducibly retry benchmarks.
- **Profiling tools**: If you are collecting CPU profiles, tools like [`samply`](https://github.com/mstange/samply) and [`perf`](https://perf.wiki.kernel.org/index.php/Main_Page) can be useful for analyzing node performance.
- **Benchmark Data**: `reth-bench` additionally contains a `--benchmark.output` flag, which will output gas used benchmarks across the benchmark range in CSV format. This may be useful for further data analysis.
- **Benchmark Data**: `reth-bench` additionally contains a `--output` flag, which will output gas used benchmarks across the benchmark range in CSV format. This may be useful for further data analysis.
- **Platform Information**: To ensure accurate and reproducible benchmarking, document the platform details, including hardware specifications, OS version, and any other relevant information before publishing any benchmarks.

View File

@@ -18,7 +18,7 @@ use reth_primitives_traits::{
};
use reth_storage_api::StateProviderBox;
use reth_trie::{updates::TrieUpdatesSorted, HashedPostStateSorted, TrieInputSorted};
use std::{collections::BTreeMap, sync::Arc, time::Instant};
use std::{collections::BTreeMap, ops::Deref, sync::Arc, time::Instant};
use tokio::sync::{broadcast, watch};
/// Size of the broadcast channel used to notify canonical state events.
@@ -634,6 +634,8 @@ impl<N: NodePrimitives> BlockState<N> {
/// We assume that the `Receipts` in the executed block `ExecutionOutcome`
/// has only one element corresponding to the executed block associated to
/// the state.
///
/// This clones the vector of receipts. To avoid it, use [`Self::executed_block_receipts_ref`].
pub fn executed_block_receipts(&self) -> Vec<N::Receipt> {
let receipts = self.receipts();
@@ -646,6 +648,22 @@ impl<N: NodePrimitives> BlockState<N> {
receipts.first().cloned().unwrap_or_default()
}
/// Returns a slice of `Receipt` of executed block that determines the state.
/// We assume that the `Receipts` in the executed block `ExecutionOutcome`
/// has only one element corresponding to the executed block associated to
/// the state.
pub fn executed_block_receipts_ref(&self) -> &[N::Receipt] {
let receipts = self.receipts();
debug_assert!(
receipts.len() <= 1,
"Expected at most one block's worth of receipts, found {}",
receipts.len()
);
receipts.first().map(|receipts| receipts.deref()).unwrap_or_default()
}
/// Returns a vector of __parent__ `BlockStates`.
///
/// The block state order in the output vector is newest to oldest (highest to lowest):

View File

@@ -8,12 +8,17 @@ use reth_db::{
RawDupSort,
};
use reth_db_api::{
table::{Decompress, DupSort, Table},
tables, RawKey, RawTable, Receipts, TableViewer, Transactions,
cursor::{DbCursorRO, DbDupCursorRO},
database::Database,
table::{Compress, Decompress, DupSort, Table},
tables,
transaction::DbTx,
RawKey, RawTable, Receipts, TableViewer, Transactions,
};
use reth_db_common::DbTool;
use reth_node_api::{HeaderTy, ReceiptTy, TxTy};
use reth_node_builder::NodeTypesWithDB;
use reth_primitives_traits::ValueWithSubKey;
use reth_provider::{providers::ProviderNodeTypes, StaticFileProviderFactory};
use reth_static_file_types::StaticFileSegment;
use tracing::error;
@@ -39,6 +44,14 @@ enum Subcommand {
#[arg(value_parser = maybe_json_value_parser)]
subkey: Option<String>,
/// Optional end key for range query (exclusive upper bound)
#[arg(value_parser = maybe_json_value_parser)]
end_key: Option<String>,
/// Optional end subkey for range query (exclusive upper bound)
#[arg(value_parser = maybe_json_value_parser)]
end_subkey: Option<String>,
/// Output bytes instead of human-readable decoded value
#[arg(long)]
raw: bool,
@@ -61,8 +74,8 @@ impl Command {
/// Execute `db get` command
pub fn execute<N: ProviderNodeTypes>(self, tool: &DbTool<N>) -> eyre::Result<()> {
match self.subcommand {
Subcommand::Mdbx { table, key, subkey, raw } => {
table.view(&GetValueViewer { tool, key, subkey, raw })?
Subcommand::Mdbx { table, key, subkey, end_key, end_subkey, raw } => {
table.view(&GetValueViewer { tool, key, subkey, end_key, end_subkey, raw })?
}
Subcommand::StaticFile { segment, key, raw } => {
let (key, mask): (u64, _) = match segment {
@@ -154,6 +167,8 @@ struct GetValueViewer<'a, N: NodeTypesWithDB> {
tool: &'a DbTool<N>,
key: String,
subkey: Option<String>,
end_key: Option<String>,
end_subkey: Option<String>,
raw: bool,
}
@@ -163,53 +178,158 @@ impl<N: ProviderNodeTypes> TableViewer<()> for GetValueViewer<'_, N> {
fn view<T: Table>(&self) -> Result<(), Self::Error> {
let key = table_key::<T>(&self.key)?;
let content = if self.raw {
self.tool
.get::<RawTable<T>>(RawKey::from(key))?
.map(|content| hex::encode_prefixed(content.raw_value()))
} else {
self.tool.get::<T>(key)?.as_ref().map(serde_json::to_string_pretty).transpose()?
};
// A non-dupsort table cannot have subkeys. The `subkey` arg becomes the `end_key`. First we
// check that `end_key` and `end_subkey` weren't previously given, as that wouldn't be
// valid.
if self.end_key.is_some() || self.end_subkey.is_some() {
return Err(eyre::eyre!("Only END_KEY can be given for non-DUPSORT tables"));
}
match content {
Some(content) => {
println!("{content}");
}
None => {
error!(target: "reth::cli", "No content for the given table key.");
}
};
let end_key = self.subkey.clone();
// Check if we're doing a range query
if let Some(ref end_key_str) = end_key {
let end_key = table_key::<T>(end_key_str)?;
// Use walk_range to iterate over the range
self.tool.provider_factory.db_ref().view(|tx| {
let mut cursor = tx.cursor_read::<T>()?;
let walker = cursor.walk_range(key..end_key)?;
for result in walker {
let (k, v) = result?;
let json_val = if self.raw {
let raw_key = RawKey::from(k);
serde_json::json!({
"key": hex::encode_prefixed(raw_key.raw_key()),
"val": hex::encode_prefixed(v.compress().as_ref()),
})
} else {
serde_json::json!({
"key": &k,
"val": &v,
})
};
println!("{}", serde_json::to_string_pretty(&json_val)?);
}
Ok::<_, eyre::Report>(())
})??;
} else {
// Single key lookup
let content = if self.raw {
self.tool
.get::<RawTable<T>>(RawKey::from(key))?
.map(|content| hex::encode_prefixed(content.raw_value()))
} else {
self.tool.get::<T>(key)?.as_ref().map(serde_json::to_string_pretty).transpose()?
};
match content {
Some(content) => {
println!("{content}");
}
None => {
error!(target: "reth::cli", "No content for the given table key.");
}
};
}
Ok(())
}
fn view_dupsort<T: DupSort>(&self) -> Result<(), Self::Error> {
fn view_dupsort<T: DupSort>(&self) -> Result<(), Self::Error>
where
T::Value: reth_primitives_traits::ValueWithSubKey<SubKey = T::SubKey>,
{
// get a key for given table
let key = table_key::<T>(&self.key)?;
// process dupsort table
let subkey = table_subkey::<T>(self.subkey.as_deref())?;
let content = if self.raw {
self.tool
.get_dup::<RawDupSort<T>>(RawKey::from(key), RawKey::from(subkey))?
.map(|content| hex::encode_prefixed(content.raw_value()))
} else {
self.tool
.get_dup::<T>(key, subkey)?
// Check if we're doing a range query
if let Some(ref end_key_str) = self.end_key {
let end_key = table_key::<T>(end_key_str)?;
let start_subkey = table_subkey::<T>(Some(
self.subkey.as_ref().expect("must have been given if end_key is given").as_str(),
))?;
let end_subkey_parsed = self
.end_subkey
.as_ref()
.map(serde_json::to_string_pretty)
.transpose()?
};
.map(|s| table_subkey::<T>(Some(s.as_str())))
.transpose()?;
match content {
Some(content) => {
println!("{content}");
}
None => {
error!(target: "reth::cli", "No content for the given table subkey.");
}
};
self.tool.provider_factory.db_ref().view(|tx| {
let mut cursor = tx.cursor_dup_read::<T>()?;
// Seek to the starting key. If there is actually a key at the starting key then
// seek to the subkey within it.
if let Some((decoded_key, _)) = cursor.seek(key.clone())? &&
decoded_key == key
{
cursor.seek_by_key_subkey(key.clone(), start_subkey.clone())?;
}
// Get the current position to start iteration
let mut current = cursor.current()?;
while let Some((decoded_key, decoded_value)) = current {
// Extract the subkey using the ValueWithSubKey trait
let decoded_subkey = decoded_value.get_subkey();
// Check if we've reached the end (exclusive)
if (&decoded_key, Some(&decoded_subkey)) >=
(&end_key, end_subkey_parsed.as_ref())
{
break;
}
// Output the entry with both key and subkey
let json_val = if self.raw {
let raw_key = RawKey::from(decoded_key.clone());
serde_json::json!({
"key": hex::encode_prefixed(raw_key.raw_key()),
"val": hex::encode_prefixed(decoded_value.compress().as_ref()),
})
} else {
serde_json::json!({
"key": &decoded_key,
"val": &decoded_value,
})
};
println!("{}", serde_json::to_string_pretty(&json_val)?);
// Move to next entry
current = cursor.next()?;
}
Ok::<_, eyre::Report>(())
})??;
} else {
// Single key/subkey lookup
let subkey = table_subkey::<T>(self.subkey.as_deref())?;
let content = if self.raw {
self.tool
.get_dup::<RawDupSort<T>>(RawKey::from(key), RawKey::from(subkey))?
.map(|content| hex::encode_prefixed(content.raw_value()))
} else {
self.tool
.get_dup::<T>(key, subkey)?
.as_ref()
.map(serde_json::to_string_pretty)
.transpose()?
};
match content {
Some(content) => {
println!("{content}");
}
None => {
error!(target: "reth::cli", "No content for the given table subkey.");
}
};
}
Ok(())
}
}

View File

@@ -110,7 +110,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> InitStateC
static_file_provider.commit()?;
} else if last_block_number > 0 && last_block_number < header.number() {
return Err(eyre::eyre!(
"Data directory should be empty when calling init-state with --without-evm-history."
"Data directory should be empty when calling init-state with --without-evm."
));
}
}

View File

@@ -531,8 +531,12 @@ impl PruneConfig {
self.segments.receipts.is_some() || !self.segments.receipts_log_filter.is_empty()
}
/// Merges another `PruneConfig` into this one, taking values from the other config if and only
/// if the corresponding value in this config is not set.
/// Merges values from `other` into `self`.
/// - `Option<PruneMode>` fields: set from `other` only if `self` is `None`.
/// - `block_interval`: set from `other` only if `self.block_interval ==
/// DEFAULT_BLOCK_INTERVAL`.
/// - `merkle_changesets`: always set from `other`.
/// - `receipts_log_filter`: set from `other` only if `self` is empty and `other` is non-empty.
pub fn merge(&mut self, other: Self) {
let Self {
block_interval,
@@ -561,7 +565,7 @@ impl PruneConfig {
self.segments.account_history = self.segments.account_history.or(account_history);
self.segments.storage_history = self.segments.storage_history.or(storage_history);
self.segments.bodies_history = self.segments.bodies_history.or(bodies_history);
// Merkle changesets is not optional, so we just replace it if provided
// Merkle changesets is not optional; always take the value from `other`
self.segments.merkle_changesets = merkle_changesets;
if self.segments.receipts_log_filter.0.is_empty() && !receipts_log_filter.0.is_empty() {

View File

@@ -327,7 +327,7 @@ pub fn validate_against_parent_eip1559_base_fee<ChainSpec: EthChainSpec + Ethere
Ok(())
}
/// Validates the timestamp against the parent to make sure it is in the past.
/// Validates that the block timestamp is greater than the parent block timestamp.
#[inline]
pub fn validate_against_parent_timestamp<H: BlockHeader>(
header: &H,

View File

@@ -63,7 +63,7 @@ impl EngineApiMetrics {
pub(crate) fn execute_metered<E, DB>(
&self,
executor: E,
transactions: impl Iterator<Item = Result<impl ExecutableTx<E>, BlockExecutionError>>,
mut transactions: impl Iterator<Item = Result<impl ExecutableTx<E>, BlockExecutionError>>,
state_hook: Box<dyn OnStateHook>,
) -> Result<(BlockExecutionOutput<E::Receipt>, Vec<Address>), BlockExecutionError>
where
@@ -79,27 +79,42 @@ impl EngineApiMetrics {
let mut executor = executor.with_state_hook(Some(Box::new(wrapper)));
let f = || {
let start = Instant::now();
debug_span!(target: "engine::tree", "pre execution")
.entered()
.in_scope(|| executor.apply_pre_execution_changes())?;
self.executor.pre_execution_histogram.record(start.elapsed());
let exec_span = debug_span!(target: "engine::tree", "execution").entered();
for tx in transactions {
loop {
let start = Instant::now();
let Some(tx) = transactions.next() else { break };
self.executor.transaction_wait_histogram.record(start.elapsed());
let tx = tx?;
senders.push(*tx.signer());
let span =
debug_span!(target: "engine::tree", "execute tx", tx_hash=?tx.tx().tx_hash());
let enter = span.entered();
trace!(target: "engine::tree", "Executing transaction");
senders.push(*tx.signer());
let start = Instant::now();
let gas_used = executor.execute_transaction(tx)?;
self.executor.transaction_execution_histogram.record(start.elapsed());
// record the tx gas used
enter.record("gas_used", gas_used);
}
drop(exec_span);
debug_span!(target: "engine::tree", "finish")
let start = Instant::now();
let result = debug_span!(target: "engine::tree", "finish")
.entered()
.in_scope(|| executor.finish())
.map(|(evm, result)| (evm.into_db(), result))
.map(|(evm, result)| (evm.into_db(), result));
self.executor.post_execution_histogram.record(start.elapsed());
result
};
// Use metered to execute and track timing/gas metrics

File diff suppressed because it is too large Load Diff

View File

@@ -252,7 +252,7 @@ where
/// Extracts block headers and bodies from `iter` and appends them using `writer` and `provider`.
///
/// Adds on to `total_difficulty` and collects hash to height using `hash_collector`.
/// Collects hash to height using `hash_collector`.
///
/// Skips all blocks below the [`start_bound`] of `block_numbers` and stops when reaching past the
/// [`end_bound`] or the end of the file.

View File

@@ -17,6 +17,14 @@ pub struct ExecutorMetrics {
/// The Histogram for amount of gas used.
pub gas_used_histogram: Histogram,
/// The Histogram for amount of time taken to execute the pre-execution changes.
pub pre_execution_histogram: Histogram,
/// The Histogram for amount of time taken to wait for one transaction to be available.
pub transaction_wait_histogram: Histogram,
/// The Histogram for amount of time taken to execute one transaction.
pub transaction_execution_histogram: Histogram,
/// The Histogram for amount of time taken to execute the post-execution changes.
pub post_execution_histogram: Histogram,
/// The Histogram for amount of time taken to execute blocks.
pub execution_histogram: Histogram,
/// The total amount of time it took to execute the latest block.

View File

@@ -95,17 +95,33 @@ pub(super) mod serde_bincode_compat {
/// notification: ExExNotification<N>,
/// }
/// ```
///
/// This enum mirrors [`super::ExExNotification`] but uses borrowed [`Chain`] types
/// instead of `Arc<Chain>` for bincode compatibility.
#[derive(Debug, Serialize, Deserialize)]
#[expect(missing_docs)]
#[serde(bound = "")]
#[expect(clippy::large_enum_variant)]
pub enum ExExNotification<'a, N>
where
N: NodePrimitives,
{
ChainCommitted { new: Chain<'a, N> },
ChainReorged { old: Chain<'a, N>, new: Chain<'a, N> },
ChainReverted { old: Chain<'a, N> },
/// Chain got committed without a reorg, and only the new chain is returned.
ChainCommitted {
/// The new chain after commit.
new: Chain<'a, N>,
},
/// Chain got reorged, and both the old and the new chains are returned.
ChainReorged {
/// The old chain before reorg.
old: Chain<'a, N>,
/// The new chain after reorg.
new: Chain<'a, N>,
},
/// Chain got reverted, and only the old chain is returned.
ChainReverted {
/// The old chain before reversion.
old: Chain<'a, N>,
},
}
impl<'a, N> From<&'a super::ExExNotification<N>> for ExExNotification<'a, N>

View File

@@ -131,6 +131,8 @@ pub struct TransactionsManagerMetrics {
/// capacity. Note, this is not a limit to the number of inflight requests, but a health
/// measure.
pub(crate) capacity_pending_pool_imports: Counter,
/// The time it took to prepare transactions for import. This is mostly sender recovery.
pub(crate) pool_import_prepare_duration: Histogram,
/* ================ POLL DURATION ================ */

View File

@@ -1338,6 +1338,8 @@ where
let Some(peer) = self.peers.get_mut(&peer_id) else { return };
let mut transactions = transactions.0;
let start = Instant::now();
// mark the transactions as received
self.transaction_fetcher
.remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| tx.tx_hash()));
@@ -1459,6 +1461,8 @@ where
if num_already_seen_by_peer > 0 {
self.report_already_seen(peer_id);
}
self.metrics.pool_import_prepare_duration.record(start.elapsed());
}
/// Processes a [`FetchEvent`].

View File

@@ -164,7 +164,7 @@ pub use alloy_primitives::{logs_bloom, Log, LogData};
pub mod proofs;
mod storage;
pub use storage::StorageEntry;
pub use storage::{StorageEntry, ValueWithSubKey};
pub mod sync;

View File

@@ -1,5 +1,17 @@
use alloy_primitives::{B256, U256};
/// Trait for `DupSort` table values that contain a subkey.
///
/// This trait allows extracting the subkey from a value during database iteration,
/// enabling proper range queries and filtering on `DupSort` tables.
pub trait ValueWithSubKey {
/// The type of the subkey.
type SubKey;
/// Extract the subkey from the value.
fn get_subkey(&self) -> Self::SubKey;
}
/// Account storage entry.
///
/// `key` is the subkey when used as a value in the `StorageChangeSets` table.
@@ -21,6 +33,14 @@ impl StorageEntry {
}
}
impl ValueWithSubKey for StorageEntry {
type SubKey = B256;
fn get_subkey(&self) -> Self::SubKey {
self.key
}
}
impl From<(B256, U256)> for StorageEntry {
fn from((key, value): (B256, U256)) -> Self {
Self { key, value }

View File

@@ -107,18 +107,13 @@ impl RpcServiceT for RpcService {
fn batch<'a>(&self, req: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
let entries: Vec<_> = req.into_iter().collect();
let mut got_notif = false;
let mut batch_response = BatchResponseBuilder::new_with_limit(self.max_response_body_size);
let mut pending_calls: FuturesOrdered<_> = entries
.into_iter()
.filter_map(|v| match v {
Ok(BatchEntry::Call(call)) => Some(Either::Right(self.call(call))),
Ok(BatchEntry::Notification(_n)) => {
got_notif = true;
None
}
Ok(BatchEntry::Notification(_n)) => None,
Err(_err) => Some(Either::Left(async {
MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest))
})),

View File

@@ -110,7 +110,8 @@ pub trait EthFees:
// increasing and 0 <= p <= 100
// Note: The types used ensure that the percentiles are never < 0
if let Some(percentiles) = &reward_percentiles &&
percentiles.windows(2).any(|w| w[0] > w[1] || w[0] > 100.)
(percentiles.iter().any(|p| *p < 0.0 || *p > 100.0) ||
percentiles.windows(2).any(|w| w[0] > w[1]))
{
return Err(EthApiError::InvalidRewardPercentiles.into())
}

View File

@@ -11,6 +11,7 @@ use reth_errors::ProviderError;
use reth_primitives_traits::{BlockBody, RecoveredBlock, SignedTransaction};
use reth_storage_api::{BlockReader, ProviderBlock};
use std::sync::Arc;
use thiserror::Error;
/// Returns all matching of a block's receipts when the transaction hashes are known.
pub fn matching_block_logs_with_tx_hashes<'a, I, R>(
@@ -147,30 +148,40 @@ where
/// Computes the block range based on the filter range and current block numbers.
///
/// This returns `(min(best,from), min(best,to))`.
/// Returns an error for invalid ranges rather than silently clamping values.
pub fn get_filter_block_range(
from_block: Option<u64>,
to_block: Option<u64>,
start_block: u64,
info: ChainInfo,
) -> (u64, u64) {
let mut from_block_number = start_block;
let mut to_block_number = info.best_number;
) -> Result<(u64, u64), FilterBlockRangeError> {
let from_block_number = from_block.unwrap_or(start_block);
let to_block_number = to_block.unwrap_or(info.best_number);
// if a `from_block` argument is provided then the `from_block_number` is the converted value or
// the start block if the converted value is larger than the start block, since `from_block`
// can't be a future block: `min(head, from_block)`
if let Some(filter_from_block) = from_block {
from_block_number = start_block.min(filter_from_block)
// from > to is an invalid range
if from_block_number > to_block_number {
return Err(FilterBlockRangeError::InvalidBlockRange);
}
// upper end of the range is the converted `to_block` argument, restricted by the best block:
// `min(best_number,to_block_number)`
if let Some(filter_to_block) = to_block {
to_block_number = info.best_number.min(filter_to_block);
// we cannot query blocks that don't exist yet
if to_block_number > info.best_number {
return Err(FilterBlockRangeError::BlockRangeExceedsHead);
}
(from_block_number, to_block_number)
Ok((from_block_number, to_block_number))
}
/// Errors for filter block range validation.
///
/// See also <https://github.com/ethereum/go-ethereum/blob/master/eth/filters/filter.go#L224-L230>.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Error)]
pub enum FilterBlockRangeError {
/// `from_block > to_block`
#[error("invalid block range params")]
InvalidBlockRange,
/// Block range extends beyond current head
#[error("block range extends beyond current head block")]
BlockRangeExceedsHead,
}
#[cfg(test)]
@@ -184,44 +195,73 @@ mod tests {
let from = 14000000u64;
let to = 14000100u64;
let info = ChainInfo { best_number: 15000000, ..Default::default() };
let range = get_filter_block_range(Some(from), Some(to), info.best_number, info);
let range = get_filter_block_range(Some(from), Some(to), info.best_number, info).unwrap();
assert_eq!(range, (from, to));
}
#[test]
fn test_log_range_higher() {
let from = 15000001u64;
let to = 15000002u64;
let info = ChainInfo { best_number: 15000000, ..Default::default() };
let range = get_filter_block_range(Some(from), Some(to), info.best_number, info);
assert_eq!(range, (info.best_number, info.best_number));
}
#[test]
fn test_log_range_from() {
let from = 14000000u64;
let info = ChainInfo { best_number: 15000000, ..Default::default() };
let range = get_filter_block_range(Some(from), None, info.best_number, info);
let range = get_filter_block_range(Some(from), None, 0, info).unwrap();
assert_eq!(range, (from, info.best_number));
}
#[test]
fn test_log_range_to() {
let to = 14000000u64;
let start_block = 0u64;
let info = ChainInfo { best_number: 15000000, ..Default::default() };
let range = get_filter_block_range(None, Some(to), info.best_number, info);
assert_eq!(range, (info.best_number, to));
let range = get_filter_block_range(None, Some(to), start_block, info).unwrap();
assert_eq!(range, (start_block, to));
}
#[test]
fn test_log_range_higher_error() {
// Range extends beyond head -> should error instead of clamping
let from = 15000001u64;
let to = 15000002u64;
let info = ChainInfo { best_number: 15000000, ..Default::default() };
let err = get_filter_block_range(Some(from), Some(to), info.best_number, info).unwrap_err();
assert_eq!(err, FilterBlockRangeError::BlockRangeExceedsHead);
}
#[test]
fn test_log_range_to_below_start_error() {
// to_block < start_block, default from -> invalid range
let to = 14000000u64;
let info = ChainInfo { best_number: 15000000, ..Default::default() };
let err = get_filter_block_range(None, Some(to), info.best_number, info).unwrap_err();
assert_eq!(err, FilterBlockRangeError::InvalidBlockRange);
}
#[test]
fn test_log_range_empty() {
let info = ChainInfo { best_number: 15000000, ..Default::default() };
let range = get_filter_block_range(None, None, info.best_number, info);
let range = get_filter_block_range(None, None, info.best_number, info).unwrap();
// no range given -> head
assert_eq!(range, (info.best_number, info.best_number));
}
#[test]
fn test_invalid_block_range_error() {
let from = 100;
let to = 50;
let info = ChainInfo { best_number: 150, ..Default::default() };
let err = get_filter_block_range(Some(from), Some(to), 0, info).unwrap_err();
assert_eq!(err, FilterBlockRangeError::InvalidBlockRange);
}
#[test]
fn test_block_range_exceeds_head_error() {
let from = 100;
let to = 200;
let info = ChainInfo { best_number: 150, ..Default::default() };
let err = get_filter_block_range(Some(from), Some(to), 0, info).unwrap_err();
assert_eq!(err, FilterBlockRangeError::BlockRangeExceedsHead);
}
#[test]
fn parse_log_from_only() {
let s = r#"{"fromBlock":"0xf47a42","address":["0x7de93682b9b5d80d45cd371f7a14f74d49b0914c","0x0f00392fcb466c0e4e4310d81b941e07b4d5a079","0xebf67ab8cff336d3f609127e8bbf8bd6dd93cd81"],"topics":["0x0559884fd3a460db3073b7fc896cc77986f16e378210ded43186175bf646fc5f"]}"#;
@@ -242,7 +282,8 @@ mod tests {
to_block.and_then(alloy_rpc_types_eth::BlockNumberOrTag::as_number),
start_block,
info,
);
)
.unwrap();
assert_eq!(from_block_number, 16022082);
assert_eq!(to_block_number, best_number);
}

View File

@@ -267,7 +267,7 @@ where
.map(|num| self.provider().convert_block_number(num))
.transpose()?
.flatten();
logs_utils::get_filter_block_range(from, to, start_block, info)
logs_utils::get_filter_block_range(from, to, start_block, info)?
}
FilterBlockOption::AtBlockHash(_) => {
// blockHash is equivalent to fromBlock = toBlock = the block number with
@@ -561,7 +561,7 @@ where
}
let (from_block_number, to_block_number) =
logs_utils::get_filter_block_range(from, to, start_block, info);
logs_utils::get_filter_block_range(from, to, start_block, info)?;
self.get_logs_in_block_range(filter, from_block_number, to_block_number, limits)
.await
@@ -952,6 +952,15 @@ impl From<ProviderError> for EthFilterError {
}
}
impl From<logs_utils::FilterBlockRangeError> for EthFilterError {
fn from(err: logs_utils::FilterBlockRangeError) -> Self {
match err {
logs_utils::FilterBlockRangeError::InvalidBlockRange => Self::InvalidBlockRangeParams,
logs_utils::FilterBlockRangeError::BlockRangeExceedsHead => Self::BlockRangeExceedsHead,
}
}
}
/// Helper type for the common pattern of returning receipts, block and the original header that is
/// a match for the filter.
struct ReceiptBlockResult<P>

View File

@@ -14,7 +14,9 @@ use reth_network_p2p::headers::{
downloader::{HeaderDownloader, HeaderSyncGap, SyncTarget},
error::HeadersDownloaderError,
};
use reth_primitives_traits::{serde_bincode_compat, FullBlockHeader, NodePrimitives, SealedHeader};
use reth_primitives_traits::{
serde_bincode_compat, FullBlockHeader, HeaderTy, NodePrimitives, SealedHeader,
};
use reth_provider::{
providers::StaticFileWriter, BlockHashReader, DBProvider, HeaderSyncGapProvider,
StaticFileProviderFactory,
@@ -333,8 +335,9 @@ where
(input.unwind_to + 1)..,
)?;
provider.tx_ref().unwind_table_by_num::<tables::CanonicalHeaders>(input.unwind_to)?;
let unfinalized_headers_unwound =
provider.tx_ref().unwind_table_by_num::<tables::Headers>(input.unwind_to)?;
let unfinalized_headers_unwound = provider.tx_ref().unwind_table_by_num::<tables::Headers<
HeaderTy<Provider::Primitives>,
>>(input.unwind_to)?;
// determine how many headers to unwind from the static files based on the highest block and
// the unwind_to block

View File

@@ -94,7 +94,10 @@ pub trait TableViewer<R> {
/// Operate on the dupsort table in a generic way.
///
/// By default, the `view` function is invoked unless overridden.
fn view_dupsort<T: DupSort>(&self) -> Result<R, Self::Error> {
fn view_dupsort<T: DupSort>(&self) -> Result<R, Self::Error>
where
T::Value: reth_primitives_traits::ValueWithSubKey<SubKey = T::SubKey>,
{
self.view::<T>()
}
}

View File

@@ -1,5 +1,5 @@
use alloy_primitives::Address;
use reth_primitives_traits::Account;
use reth_primitives_traits::{Account, ValueWithSubKey};
/// Account as it is saved in the database.
///
@@ -15,6 +15,14 @@ pub struct AccountBeforeTx {
pub info: Option<Account>,
}
impl ValueWithSubKey for AccountBeforeTx {
type SubKey = Address;
fn get_subkey(&self) -> Self::SubKey {
self.address
}
}
// NOTE: Removing reth_codec and manually encode subkey
// and compress second part of the value. If we have compression
// over whole value (Even SubKey) that would mess up fetching of values with seek_by_key_subkey

View File

@@ -1046,7 +1046,7 @@ impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
id.into(),
|provider| provider.receipt(id),
|tx_index, _, block_state| {
Ok(block_state.executed_block_receipts().get(tx_index).cloned())
Ok(block_state.executed_block_receipts_ref().get(tx_index).cloned())
},
)
}
@@ -1055,7 +1055,7 @@ impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
let executed_block = block_state.block_ref();
let block = executed_block.recovered_block();
let receipts = block_state.executed_block_receipts();
let receipts = block_state.executed_block_receipts_ref();
// assuming 1:1 correspondence between transactions and receipts
debug_assert_eq!(

View File

@@ -66,6 +66,12 @@ impl SenderId {
std::ops::Bound::Included(TransactionId::new(self, 0))
}
/// Returns a `Range` for [`TransactionId`] starting with nonce `0` and ending with nonce
/// `u64::MAX`
pub const fn range(self) -> std::ops::RangeInclusive<TransactionId> {
TransactionId::new(self, 0)..=TransactionId::new(self, u64::MAX)
}
/// Converts the sender to a [`TransactionId`] with the given nonce.
pub const fn into_transaction_id(self, nonce: u64) -> TransactionId {
TransactionId::new(self, nonce)

View File

@@ -186,11 +186,11 @@ impl<T: ParkedOrd> ParkedPool<T> {
{
// NOTE: This will not panic due to `!last_sender_transaction.is_empty()`
let sender_id = self.last_sender_submission.last().unwrap().sender_id;
let list = self.get_txs_by_sender(sender_id);
// Drop transactions from this sender until the pool is under limits
for txid in list.into_iter().rev() {
if let Some(tx) = self.remove_transaction(&txid) {
while let Some((tx_id, _)) = self.by_id.range(sender_id.range()).next_back() {
let tx_id = *tx_id;
if let Some(tx) = self.remove_transaction(&tx_id) {
removed.push(tx);
}

View File

@@ -283,6 +283,16 @@ pub trait TransactionPool: Clone + Debug + Send + Sync {
NewSubpoolTransactionStream::new(self.new_transactions_listener(), SubPool::Queued)
}
/// Returns a new Stream that yields new transactions added to the blob sub-pool.
///
/// This is a convenience wrapper around [`Self::new_transactions_listener`] that filters for
/// [`SubPool::Blob`](crate::SubPool).
fn new_blob_pool_transactions_listener(
&self,
) -> NewSubpoolTransactionStream<Self::Transaction> {
NewSubpoolTransactionStream::new(self.new_transactions_listener(), SubPool::Blob)
}
/// Returns the _hashes_ of all transactions in the pool that are allowed to be propagated.
///
/// This excludes hashes that aren't allowed to be propagated.
@@ -1583,7 +1593,7 @@ pub struct PoolSize {
pub queued_size: usize,
/// Number of all transactions of all sub-pools
///
/// Note: this is the sum of ```pending + basefee + queued```
/// Note: this is the sum of ```pending + basefee + queued + blob```
pub total: usize,
}

View File

@@ -31,6 +31,7 @@ nybbles = { workspace = true, features = ["rlp"] }
# reth
revm-database.workspace = true
revm-state.workspace = true
# `serde` feature
serde = { workspace = true, optional = true }
@@ -64,7 +65,6 @@ bincode.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
revm-state.workspace = true
[features]
default = ["std"]

View File

@@ -83,6 +83,19 @@ impl MultiAddedRemovedKeys {
self.storages.entry(address).or_insert_with(default_added_removed_keys);
}
}
/// Marks an account as removed.
pub fn mark_account_removed(&mut self, account: B256) {
self.account.insert_removed(account);
}
/// Marks a storage slot as removed for the given account.
pub fn mark_storage_removed(&mut self, hashed_address: B256, slot: B256) {
self.storages
.entry(hashed_address)
.or_insert_with(default_added_removed_keys)
.insert_removed(slot);
}
}
#[cfg(test)]
@@ -184,6 +197,147 @@ mod tests {
assert!(!multi_keys.get_accounts().is_removed(&addr));
}
#[test]
fn test_record_removals_is_monotonic() {
use alloy_primitives::Address;
use revm_state::{Account, AccountInfo, AccountStatus, EvmStorageSlot};
let mut multi_keys = MultiAddedRemovedKeys::new();
let address = Address::random();
let slot = U256::from(42);
let hashed_addr = keccak256(address);
let hashed_slot = keccak256(B256::from(slot));
// Update 1: Create slot with value 100
let mut update1 = EvmState::default();
update1.insert(
address,
Account {
info: AccountInfo::default(),
transaction_id: 0,
storage: std::iter::once((
slot,
EvmStorageSlot::new_changed(U256::ZERO, U256::from(100), 0),
))
.collect(),
status: AccountStatus::Touched,
},
);
multi_keys.record_removals(&update1);
// Slot should NOT be marked as removed (value is 100, not 0)
assert!(
multi_keys.get_storage(&hashed_addr).is_none() ||
!multi_keys.get_storage(&hashed_addr).unwrap().is_removed(&hashed_slot)
);
// Update 2: Delete slot (set to 0)
let mut update2 = EvmState::default();
update2.insert(
address,
Account {
info: AccountInfo::default(),
transaction_id: 1,
storage: std::iter::once((
slot,
EvmStorageSlot::new_changed(U256::from(100), U256::ZERO, 1),
))
.collect(),
status: AccountStatus::Touched,
},
);
multi_keys.record_removals(&update2);
// Slot should be marked as removed
assert!(multi_keys.get_storage(&hashed_addr).unwrap().is_removed(&hashed_slot));
// Update 3: Recreate slot with value 200
let mut update3 = EvmState::default();
update3.insert(
address,
Account {
info: AccountInfo::default(),
transaction_id: 2,
storage: std::iter::once((
slot,
EvmStorageSlot::new_changed(U256::ZERO, U256::from(200), 2),
))
.collect(),
status: AccountStatus::Touched,
},
);
multi_keys.record_removals(&update3);
// KEY ASSERTION: Still removed after recreation!
// This is the critical difference from update_with_state.
// Removals are monotonic - once removed, stays removed for proof invalidation.
assert!(
multi_keys.get_storage(&hashed_addr).unwrap().is_removed(&hashed_slot),
"slot should remain marked as removed even after recreation"
);
}
#[test]
fn test_record_removals_selfdestruct() {
use alloy_primitives::Address;
use revm_state::{Account, AccountInfo, AccountStatus};
let mut multi_keys = MultiAddedRemovedKeys::new();
let address = Address::random();
let hashed_addr = keccak256(address);
// Selfdestruct the account (must also be Touched to be processed)
let mut update = EvmState::default();
update.insert(
address,
Account {
info: AccountInfo::default(),
transaction_id: 0,
storage: Default::default(),
status: AccountStatus::SelfDestructed | AccountStatus::Touched,
},
);
multi_keys.record_removals(&update);
// Account should be marked as removed
assert!(multi_keys.get_accounts().is_removed(&hashed_addr));
}
#[test]
fn test_record_removals_ignores_untouched() {
use alloy_primitives::Address;
use revm_state::{Account, AccountInfo, AccountStatus, EvmStorageSlot};
let mut multi_keys = MultiAddedRemovedKeys::new();
let address = Address::random();
let slot = U256::from(1);
let hashed_addr = keccak256(address);
let hashed_slot = keccak256(B256::from(slot));
// Create an untouched account with zero storage
let mut update = EvmState::default();
update.insert(
address,
Account {
info: AccountInfo::default(),
transaction_id: 0,
storage: std::iter::once((
slot,
EvmStorageSlot::new_changed(U256::from(100), U256::ZERO, 0),
))
.collect(),
status: AccountStatus::default(), // NOT touched
},
);
multi_keys.record_removals(&update);
// Should NOT be marked as removed because account wasn't touched
assert!(
multi_keys.get_storage(&hashed_addr).is_none() ||
!multi_keys.get_storage(&hashed_addr).unwrap().is_removed(&hashed_slot)
);
}
#[test]
fn test_update_with_state_account_with_balance() {
let mut multi_keys = MultiAddedRemovedKeys::new();

View File

@@ -61,9 +61,6 @@ impl TrieInput {
}
/// Extend the trie input with the provided blocks, from oldest to newest.
///
/// For blocks with missing trie updates, the trie input will be extended with prefix sets
/// constructed from the state of this block and the state itself, **without** trie updates.
pub fn extend_with_blocks<'a>(
&mut self,
blocks: impl IntoIterator<Item = (&'a HashedPostState, &'a TrieUpdates)>,

View File

@@ -1,4 +1,5 @@
use super::{BranchNodeCompact, StoredNibblesSubKey};
use reth_primitives_traits::ValueWithSubKey;
/// Account storage trie node.
///
@@ -12,6 +13,14 @@ pub struct StorageTrieEntry {
pub node: BranchNodeCompact,
}
impl ValueWithSubKey for StorageTrieEntry {
type SubKey = StoredNibblesSubKey;
fn get_subkey(&self) -> Self::SubKey {
self.nibbles.clone()
}
}
// NOTE: Removing reth_codec and manually encode subkey
// and compress second part of the value. If we have compression
// over whole value (Even SubKey) that would mess up fetching of values with seek_by_key_subkey
@@ -46,6 +55,14 @@ pub struct TrieChangeSetsEntry {
pub node: Option<BranchNodeCompact>,
}
impl ValueWithSubKey for TrieChangeSetsEntry {
type SubKey = StoredNibblesSubKey;
fn get_subkey(&self) -> Self::SubKey {
self.nibbles.clone()
}
}
#[cfg(any(test, feature = "reth-codec"))]
impl reth_codecs::Compact for TrieChangeSetsEntry {
fn to_compact<B>(&self, buf: &mut B) -> usize

View File

@@ -126,8 +126,7 @@ impl ParallelProof {
)))
})?;
// Extract storage proof directly from the result.
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it here.
// Extract storage proof directly from the result
let storage_proof = match proof_msg.result? {
crate::proof_task::ProofResult::StorageProof { hashed_address: addr, proof } => {
debug_assert_eq!(
@@ -135,8 +134,7 @@ impl ParallelProof {
hashed_address,
"storage worker must return same address: expected {hashed_address}, got {addr}"
);
// Efficiently unwrap Arc: returns inner value if sole owner, clones otherwise.
Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone())
proof
}
crate::proof_task::ProofResult::AccountMultiproof { .. } => {
unreachable!("storage worker only sends StorageProof variant")
@@ -225,12 +223,8 @@ impl ParallelProof {
)
})?;
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it here.
let (multiproof, stats) = match proof_result_msg.result? {
crate::proof_task::ProofResult::AccountMultiproof { proof, stats } => {
// Efficiently unwrap Arc: returns inner value if sole owner, clones otherwise.
(Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone()), stats)
}
crate::proof_task::ProofResult::AccountMultiproof { proof, stats } => (proof, stats),
crate::proof_task::ProofResult::StorageProof { .. } => {
unreachable!("account worker only sends AccountMultiproof variant")
}

View File

@@ -41,7 +41,6 @@ use alloy_primitives::{
use alloy_rlp::{BufMut, Encodable};
use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use dashmap::DashMap;
use metrics::Histogram;
use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind};
use reth_provider::{DatabaseProviderROFactory, ProviderError, ProviderResult};
use reth_storage_errors::db::DatabaseError;
@@ -80,275 +79,6 @@ use crate::proof_task_metrics::{
type StorageProofResult = Result<DecodedStorageMultiProof, ParallelStateRootError>;
type TrieNodeProviderResult = Result<Option<RevealedNode>, SparseTrieError>;
/// Maximum number of storage proof jobs to batch together per account.
const STORAGE_PROOF_BATCH_LIMIT: usize = 32;
/// Maximum number of blinded node requests to defer during storage proof batching.
/// When this limit is reached, batching stops early to process deferred nodes,
/// preventing starvation of blinded node requests under high proof load.
const MAX_DEFERRED_BLINDED_NODES: usize = 16;
/// Holds batched storage proof jobs for the same account.
///
/// When multiple storage proof requests arrive for the same account, they can be merged
/// into a single proof computation with combined prefix sets and target slots.
#[derive(Debug)]
struct BatchedStorageProof {
/// The merged prefix set from all batched jobs.
prefix_set: PrefixSetMut,
/// The merged target slots from all batched jobs.
target_slots: B256Set,
/// Whether any job requested branch node masks.
with_branch_node_masks: bool,
/// The `multi_added_removed_keys` from the first job (they should all share the same `Arc`).
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
/// All senders that need to receive the result.
senders: Vec<ProofResultContext>,
}
impl BatchedStorageProof {
/// Creates a new batch from the first storage proof input.
fn new(input: StorageProofInput, sender: ProofResultContext) -> Self {
// Convert frozen PrefixSet to mutable PrefixSetMut by collecting its keys.
let prefix_set = PrefixSetMut::from(input.prefix_set.iter().copied());
Self {
prefix_set,
target_slots: input.target_slots,
with_branch_node_masks: input.with_branch_node_masks,
multi_added_removed_keys: input.multi_added_removed_keys,
senders: vec![sender],
}
}
/// Merges another storage proof job into this batch.
///
/// # Panics
/// Panics if `input.multi_added_removed_keys` does not point to the same Arc as the batch's.
/// This is a critical invariant for proof correctness.
fn merge(&mut self, input: StorageProofInput, sender: ProofResultContext) {
// Validate that all batched jobs share the same multi_added_removed_keys Arc.
// This is a critical invariant: if jobs have different keys, the merged proof
// would be computed with only the first job's keys, producing incorrect results.
// Using assert! (not debug_assert!) because incorrect proofs could cause consensus
// failures.
assert!(
match (&self.multi_added_removed_keys, &input.multi_added_removed_keys) {
(Some(a), Some(b)) => Arc::ptr_eq(a, b),
(None, None) => true,
_ => false,
},
"All batched storage proof jobs must share the same multi_added_removed_keys Arc"
);
self.prefix_set.extend_keys(input.prefix_set.iter().copied());
self.target_slots.extend(input.target_slots);
self.with_branch_node_masks |= input.with_branch_node_masks;
self.senders.push(sender);
}
/// Converts this batch into a single `StorageProofInput` for computation.
fn into_input(self, hashed_address: B256) -> (StorageProofInput, Vec<ProofResultContext>) {
let input = StorageProofInput {
hashed_address,
prefix_set: self.prefix_set.freeze(),
target_slots: self.target_slots,
with_branch_node_masks: self.with_branch_node_masks,
multi_added_removed_keys: self.multi_added_removed_keys,
};
(input, self.senders)
}
}
/// Metrics for storage worker batching.
#[derive(Clone, Default)]
struct StorageWorkerBatchMetrics {
/// Histogram of batch sizes (number of jobs merged per computation).
#[cfg(feature = "metrics")]
batch_size_histogram: Option<Histogram>,
}
impl StorageWorkerBatchMetrics {
#[cfg(feature = "metrics")]
fn new() -> Self {
Self {
batch_size_histogram: Some(metrics::histogram!(
"trie.proof_task.storage_worker_batch_size"
)),
}
}
#[cfg(not(feature = "metrics"))]
fn new() -> Self {
Self {}
}
fn record_batch_size(&self, _size: usize) {
#[cfg(feature = "metrics")]
if let Some(h) = &self.batch_size_histogram {
h.record(_size as f64);
}
}
}
/// Maximum number of account multiproof jobs to batch together.
const ACCOUNT_PROOF_BATCH_LIMIT: usize = 32;
/// Holds batched account multiproof jobs.
///
/// When multiple account multiproof requests arrive, they can be merged
/// into a single proof computation with combined targets and prefix sets.
#[derive(Debug)]
struct BatchedAccountProof {
/// The merged targets from all batched jobs.
targets: MultiProofTargets,
/// The merged account prefix set from all batched jobs.
account_prefix_set: PrefixSetMut,
/// The merged storage prefix sets from all batched jobs.
storage_prefix_sets: B256Map<PrefixSetMut>,
/// The merged destroyed accounts from all batched jobs.
destroyed_accounts: B256Set,
/// Whether any job requested branch node masks.
collect_branch_node_masks: bool,
/// The `multi_added_removed_keys` from the first job (they should all share the same `Arc`).
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
/// The shared `missed_leaves_storage_roots` cache from the first job.
missed_leaves_storage_roots: Arc<DashMap<B256, B256>>,
/// All senders that need to receive the result.
senders: Vec<ProofResultContext>,
}
impl BatchedAccountProof {
/// Creates a new batch from the first account multiproof input.
fn new(input: AccountMultiproofInput) -> Self {
// Convert frozen prefix sets to mutable versions.
let account_prefix_set =
PrefixSetMut::from(input.prefix_sets.account_prefix_set.iter().copied());
let storage_prefix_sets = input
.prefix_sets
.storage_prefix_sets
.into_iter()
.map(|(addr, ps)| (addr, PrefixSetMut::from(ps.iter().copied())))
.collect();
let destroyed_accounts = input.prefix_sets.destroyed_accounts;
Self {
targets: input.targets,
account_prefix_set,
storage_prefix_sets,
destroyed_accounts,
collect_branch_node_masks: input.collect_branch_node_masks,
multi_added_removed_keys: input.multi_added_removed_keys,
missed_leaves_storage_roots: input.missed_leaves_storage_roots,
senders: vec![input.proof_result_sender],
}
}
/// Attempts to merge another account multiproof job into this batch.
///
/// Returns the job back if caches are incompatible so the caller can process it separately.
fn try_merge(&mut self, input: AccountMultiproofInput) -> Result<(), AccountMultiproofInput> {
// Require all jobs to share the same caches; otherwise merging would produce
// incorrect proofs by reusing the wrong retained keys or missed-leaf storage roots.
let multi_added_removed_keys_mismatch =
!match (&self.multi_added_removed_keys, &input.multi_added_removed_keys) {
(Some(a), Some(b)) => Arc::ptr_eq(a, b),
(None, None) => true,
_ => false,
};
if multi_added_removed_keys_mismatch ||
!Arc::ptr_eq(&self.missed_leaves_storage_roots, &input.missed_leaves_storage_roots)
{
return Err(input);
}
// Merge targets.
self.targets.extend(input.targets);
// Merge account prefix set.
self.account_prefix_set.extend_keys(input.prefix_sets.account_prefix_set.iter().copied());
// Merge storage prefix sets.
for (addr, ps) in input.prefix_sets.storage_prefix_sets {
match self.storage_prefix_sets.entry(addr) {
alloy_primitives::map::Entry::Occupied(mut entry) => {
entry.get_mut().extend_keys(ps.iter().copied());
}
alloy_primitives::map::Entry::Vacant(entry) => {
entry.insert(PrefixSetMut::from(ps.iter().copied()));
}
}
}
// Merge destroyed accounts.
self.destroyed_accounts.extend(input.prefix_sets.destroyed_accounts);
// OR the branch node masks flag.
self.collect_branch_node_masks |= input.collect_branch_node_masks;
// Collect the sender.
self.senders.push(input.proof_result_sender);
Ok(())
}
/// Converts this batch into a single `AccountMultiproofInput` for computation.
fn into_input(self) -> (AccountMultiproofInput, Vec<ProofResultContext>) {
// Freeze the mutable prefix sets.
let storage_prefix_sets: B256Map<PrefixSet> =
self.storage_prefix_sets.into_iter().map(|(addr, ps)| (addr, ps.freeze())).collect();
let prefix_sets = TriePrefixSets {
account_prefix_set: self.account_prefix_set.freeze(),
storage_prefix_sets,
destroyed_accounts: self.destroyed_accounts,
};
// Use a dummy sender for the input since we'll handle all senders separately.
let dummy_sender = self.senders.first().expect("batch always has at least one sender");
let input = AccountMultiproofInput {
targets: self.targets,
prefix_sets,
collect_branch_node_masks: self.collect_branch_node_masks,
multi_added_removed_keys: self.multi_added_removed_keys,
missed_leaves_storage_roots: self.missed_leaves_storage_roots,
proof_result_sender: dummy_sender.clone(),
};
(input, self.senders)
}
}
/// Metrics for account worker batching.
#[derive(Clone, Default)]
struct AccountWorkerBatchMetrics {
/// Histogram of batch sizes (number of jobs merged per computation).
#[cfg(feature = "metrics")]
batch_size_histogram: Option<Histogram>,
}
impl AccountWorkerBatchMetrics {
#[cfg(feature = "metrics")]
fn new() -> Self {
Self {
batch_size_histogram: Some(metrics::histogram!(
"trie.proof_task.account_worker_batch_size"
)),
}
}
#[cfg(not(feature = "metrics"))]
fn new() -> Self {
Self {}
}
fn record_batch_size(&self, _size: usize) {
#[cfg(feature = "metrics")]
if let Some(h) = &self.batch_size_histogram {
h.record(_size as f64);
}
}
}
/// A handle that provides type-safe access to proof worker pools.
///
/// The handle stores direct senders to both storage and account worker pools,
@@ -822,16 +552,12 @@ impl TrieNodeProvider for ProofTaskTrieNodeProvider {
}
}
/// Result of a proof calculation, which can be either an account multiproof or a storage proof.
///
/// The proof data is wrapped in `Arc` to enable efficient sharing when batching multiple
/// proof requests. This avoids expensive cloning of the underlying proof structures
/// when sending results to multiple receivers.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub enum ProofResult {
/// Account multiproof with statistics
AccountMultiproof {
/// The account multiproof (Arc-wrapped for efficient sharing in batches)
proof: Arc<DecodedMultiProof>,
/// The account multiproof
proof: DecodedMultiProof,
/// Statistics collected during proof computation
stats: ParallelTrieStats,
},
@@ -839,8 +565,8 @@ pub enum ProofResult {
StorageProof {
/// The hashed address this storage proof belongs to
hashed_address: B256,
/// The storage multiproof (Arc-wrapped for efficient sharing in batches)
proof: Arc<DecodedStorageMultiProof>,
/// The storage multiproof
proof: DecodedStorageMultiProof,
},
}
@@ -849,17 +575,11 @@ impl ProofResult {
///
/// For account multiproofs, returns the multiproof directly (discarding stats).
/// For storage proofs, wraps the storage proof into a minimal multiproof.
///
/// Note: This method clones the inner proof data. If you need to avoid the clone
/// when you're the sole owner, consider using `Arc::try_unwrap` first.
pub fn into_multiproof(self) -> DecodedMultiProof {
match self {
Self::AccountMultiproof { proof, stats: _ } => {
Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone())
}
Self::AccountMultiproof { proof, stats: _ } => proof,
Self::StorageProof { hashed_address, proof } => {
let storage_proof = Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone());
DecodedMultiProof::from_storage_proof(hashed_address, storage_proof)
DecodedMultiProof::from_storage_proof(hashed_address, proof)
}
}
}
@@ -988,18 +708,11 @@ where
/// 2. Advertises availability
/// 3. Processes jobs in a loop:
/// - Receives job from channel
/// - Drains additional same-account storage proof jobs (batching)
/// - Marks worker as busy
/// - Processes the batched jobs as a single proof computation
/// - Processes the job
/// - Marks worker as available
/// 4. Shuts down when channel closes
///
/// # Batching Strategy
///
/// When multiple storage proof requests arrive for the same account, they are merged
/// into a single proof computation. This reduces redundant trie traversals when state
/// updates arrive faster than proof computation can process them.
///
/// # Panic Safety
///
/// If this function panics, the worker thread terminates but other workers
@@ -1019,7 +732,6 @@ where
// Create provider from factory
let provider = task_ctx.factory.database_provider_ro()?;
let proof_tx = ProofTaskTx::new(provider, worker_id);
let batch_metrics = StorageWorkerBatchMetrics::new();
trace!(
target: "trie::proof_task",
@@ -1034,104 +746,20 @@ where
// Initially mark this worker as available.
available_workers.fetch_add(1, Ordering::Relaxed);
// Deferred blinded node jobs to process after batched storage proofs.
// Pre-allocate with capacity to avoid reallocations during batching.
let mut deferred_blinded_nodes: Vec<(B256, Nibbles, Sender<TrieNodeProviderResult>)> =
Vec::with_capacity(MAX_DEFERRED_BLINDED_NODES);
while let Ok(job) = work_rx.recv() {
// Mark worker as busy.
available_workers.fetch_sub(1, Ordering::Relaxed);
match job {
StorageWorkerJob::StorageProof { input, proof_result_sender } => {
// Start batching: group storage proofs by account.
let mut batches: B256Map<BatchedStorageProof> = B256Map::default();
batches.insert(
input.hashed_address,
BatchedStorageProof::new(input, proof_result_sender),
Self::process_storage_proof(
worker_id,
&proof_tx,
input,
proof_result_sender,
&mut storage_proofs_processed,
&mut cursor_metrics_cache,
);
let mut total_jobs = 1usize;
// Drain additional jobs from the queue.
while total_jobs < STORAGE_PROOF_BATCH_LIMIT {
match work_rx.try_recv() {
Ok(StorageWorkerJob::StorageProof {
input: next_input,
proof_result_sender: next_sender,
}) => {
total_jobs += 1;
let addr = next_input.hashed_address;
match batches.entry(addr) {
alloy_primitives::map::Entry::Occupied(mut entry) => {
entry.get_mut().merge(next_input, next_sender);
}
alloy_primitives::map::Entry::Vacant(entry) => {
entry.insert(BatchedStorageProof::new(
next_input,
next_sender,
));
}
}
}
Ok(StorageWorkerJob::BlindedStorageNode {
account,
path,
result_sender,
}) => {
// Defer blinded node jobs to process after batched proofs.
deferred_blinded_nodes.push((account, path, result_sender));
// Stop batching if too many blinded nodes are deferred to prevent
// starvation.
if deferred_blinded_nodes.len() >= MAX_DEFERRED_BLINDED_NODES {
break;
}
}
Err(_) => break,
}
}
// Process all batched storage proofs.
for (hashed_address, batch) in batches {
let batch_size = batch.senders.len();
batch_metrics.record_batch_size(batch_size);
let (merged_input, senders) = batch.into_input(hashed_address);
trace!(
target: "trie::proof_task",
worker_id,
?hashed_address,
batch_size,
prefix_set_len = merged_input.prefix_set.len(),
target_slots_len = merged_input.target_slots.len(),
"Processing batched storage proof"
);
Self::process_batched_storage_proof(
worker_id,
&proof_tx,
hashed_address,
merged_input,
senders,
&mut storage_proofs_processed,
&mut cursor_metrics_cache,
);
}
// Process any deferred blinded node jobs.
for (account, path, result_sender) in
std::mem::take(&mut deferred_blinded_nodes)
{
Self::process_blinded_node(
worker_id,
&proof_tx,
account,
path,
result_sender,
&mut storage_nodes_processed,
);
}
}
StorageWorkerJob::BlindedStorageNode { account, path, result_sender } => {
@@ -1167,103 +795,82 @@ where
Ok(())
}
/// Processes a batched storage proof request and sends results to all waiting receivers.
///
/// This computes a single storage proof with merged targets and sends the same result
/// to all original requestors, reducing redundant trie traversals.
fn process_batched_storage_proof<Provider>(
/// Processes a storage proof request.
fn process_storage_proof<Provider>(
worker_id: usize,
proof_tx: &ProofTaskTx<Provider>,
hashed_address: B256,
input: StorageProofInput,
senders: Vec<ProofResultContext>,
proof_result_sender: ProofResultContext,
storage_proofs_processed: &mut u64,
cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
) where
Provider: TrieCursorFactory + HashedCursorFactory,
{
let hashed_address = input.hashed_address;
let ProofResultContext { sender, sequence_number: seq, state, start_time } =
proof_result_sender;
let mut trie_cursor_metrics = TrieCursorMetricsCache::default();
let mut hashed_cursor_metrics = HashedCursorMetricsCache::default();
trace!(
target: "trie::proof_task",
worker_id,
hashed_address = ?hashed_address,
prefix_set_len = input.prefix_set.len(),
target_slots_len = input.target_slots.len(),
"Processing storage proof"
);
let proof_start = Instant::now();
let result = proof_tx.compute_storage_proof(
input,
&mut trie_cursor_metrics,
&mut hashed_cursor_metrics,
);
let proof_elapsed = proof_start.elapsed();
*storage_proofs_processed += 1;
// Send the result to all waiting receivers.
let num_senders = senders.len();
match result {
Ok(storage_proof) => {
// Success case: wrap proof in Arc for efficient sharing across all senders.
let proof_result =
ProofResult::StorageProof { hashed_address, proof: Arc::new(storage_proof) };
let result_msg = result.map(|storage_proof| ProofResult::StorageProof {
hashed_address,
proof: storage_proof,
});
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
*storage_proofs_processed += 1;
if sender
.send(ProofResultMessage {
sequence_number,
result: Ok(proof_result.clone()),
elapsed: start_time.elapsed(),
state,
})
.is_err()
{
trace!(
target: "trie::proof_task",
worker_id,
?hashed_address,
sequence_number,
"Proof result receiver dropped, discarding result"
);
}
}
}
Err(error) => {
// Error case: convert to string for cloning, then send to all receivers.
let error_msg = error.to_string();
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
*storage_proofs_processed += 1;
if sender
.send(ProofResultMessage {
sequence_number,
result: Err(ParallelStateRootError::Other(error_msg.clone())),
elapsed: start_time.elapsed(),
state,
})
.is_err()
{
trace!(
target: "trie::proof_task",
worker_id,
?hashed_address,
sequence_number,
"Proof result receiver dropped, discarding result"
);
}
}
}
if sender
.send(ProofResultMessage {
sequence_number: seq,
result: result_msg,
elapsed: start_time.elapsed(),
state,
})
.is_err()
{
trace!(
target: "trie::proof_task",
worker_id,
hashed_address = ?hashed_address,
storage_proofs_processed,
"Proof result receiver dropped, discarding result"
);
}
trace!(
target: "trie::proof_task",
worker_id,
?hashed_address,
hashed_address = ?hashed_address,
proof_time_us = proof_elapsed.as_micros(),
num_senders,
total_processed = storage_proofs_processed,
trie_cursor_duration_us = trie_cursor_metrics.total_duration.as_micros(),
hashed_cursor_duration_us = hashed_cursor_metrics.total_duration.as_micros(),
"Batched storage proof completed"
?trie_cursor_metrics,
?hashed_cursor_metrics,
"Storage proof completed"
);
#[cfg(feature = "metrics")]
{
// Accumulate per-proof metrics into the worker's cache
let per_proof_cache = ProofTaskCursorMetricsCache {
account_trie_cursor: TrieCursorMetricsCache::default(),
account_hashed_cursor: HashedCursorMetricsCache::default(),
@@ -1380,18 +987,11 @@ where
/// 2. Advertises availability
/// 3. Processes jobs in a loop:
/// - Receives job from channel
/// - Drains additional account multiproof jobs (batching)
/// - Marks worker as busy
/// - Processes the batched jobs as a single proof computation
/// - Processes the job
/// - Marks worker as available
/// 4. Shuts down when channel closes
///
/// # Batching Strategy
///
/// When multiple account multiproof requests arrive, they are merged into
/// a single proof computation. This reduces redundant trie traversals when
/// state updates arrive faster than proof computation can process them.
///
/// # Panic Safety
///
/// If this function panics, the worker thread terminates but other workers
@@ -1412,7 +1012,6 @@ where
// Create provider from factory
let provider = task_ctx.factory.database_provider_ro()?;
let proof_tx = ProofTaskTx::new(provider, worker_id);
let batch_metrics = AccountWorkerBatchMetrics::new();
trace!(
target: "trie::proof_task",
@@ -1427,98 +1026,20 @@ where
// Count this worker as available only after successful initialization.
available_workers.fetch_add(1, Ordering::Relaxed);
// Deferred blinded node jobs to process after batched account proofs.
// Pre-allocate with capacity to avoid reallocations during batching.
let mut deferred_blinded_nodes: Vec<(Nibbles, Sender<TrieNodeProviderResult>)> =
Vec::with_capacity(MAX_DEFERRED_BLINDED_NODES);
while let Ok(job) = work_rx.recv() {
// Mark worker as busy.
available_workers.fetch_sub(1, Ordering::Relaxed);
match job {
AccountWorkerJob::AccountMultiproof { input } => {
// Start batching: accumulate account multiproof jobs. If we encounter an
// incompatible job (different caches), process it as a separate batch.
let mut next_account_job: Option<Box<AccountMultiproofInput>> = Some(input);
while let Some(account_job) = next_account_job.take() {
let mut batch = BatchedAccountProof::new(*account_job);
let mut pending_incompatible: Option<Box<AccountMultiproofInput>> = None;
// Drain additional jobs from the queue.
while batch.senders.len() < ACCOUNT_PROOF_BATCH_LIMIT {
match work_rx.try_recv() {
Ok(AccountWorkerJob::AccountMultiproof { input: next_input }) => {
match batch.try_merge(*next_input) {
Ok(()) => {}
Err(incompatible) => {
trace!(
target: "trie::proof_task",
worker_id,
"Account multiproof batch split due to incompatible caches"
);
pending_incompatible = Some(Box::new(incompatible));
break;
}
}
}
Ok(AccountWorkerJob::BlindedAccountNode {
path,
result_sender,
}) => {
// Defer blinded node jobs to process after batched proofs.
deferred_blinded_nodes.push((path, result_sender));
// Stop batching if too many blinded nodes are deferred to
// prevent starvation.
if deferred_blinded_nodes.len() >= MAX_DEFERRED_BLINDED_NODES {
break;
}
}
Err(_) => break,
}
}
let batch_size = batch.senders.len();
batch_metrics.record_batch_size(batch_size);
let (merged_input, senders) = batch.into_input();
trace!(
target: "trie::proof_task",
worker_id,
batch_size,
targets_len = merged_input.targets.len(),
"Processing batched account multiproof"
);
Self::process_batched_account_multiproof(
worker_id,
&proof_tx,
&storage_work_tx,
merged_input,
senders,
&mut account_proofs_processed,
&mut cursor_metrics_cache,
);
// If we encountered an incompatible job, process it as its own batch
// before handling any deferred blinded node requests.
if let Some(incompatible_job) = pending_incompatible {
next_account_job = Some(incompatible_job);
}
}
// Process any deferred blinded node jobs.
for (path, result_sender) in std::mem::take(&mut deferred_blinded_nodes) {
Self::process_blinded_node(
worker_id,
&proof_tx,
path,
result_sender,
&mut account_nodes_processed,
);
}
Self::process_account_multiproof(
worker_id,
&proof_tx,
storage_work_tx.clone(),
*input,
&mut account_proofs_processed,
&mut cursor_metrics_cache,
);
}
AccountWorkerJob::BlindedAccountNode { path, result_sender } => {
@@ -1553,16 +1074,12 @@ where
Ok(())
}
/// Processes a batched account multiproof request and sends results to all waiting receivers.
///
/// This computes a single account multiproof with merged targets and sends the same result
/// to all original requestors, reducing redundant trie traversals.
fn process_batched_account_multiproof<Provider>(
/// Processes an account multiproof request.
fn process_account_multiproof<Provider>(
worker_id: usize,
proof_tx: &ProofTaskTx<Provider>,
storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
input: AccountMultiproofInput,
senders: Vec<ProofResultContext>,
account_proofs_processed: &mut u64,
cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
) where
@@ -1574,21 +1091,21 @@ where
collect_branch_node_masks,
multi_added_removed_keys,
missed_leaves_storage_roots,
proof_result_sender: _, // We use the senders vec instead
proof_result_sender:
ProofResultContext { sender: result_tx, sequence_number: seq, state, start_time: start },
} = input;
let span = debug_span!(
target: "trie::proof_task",
"Batched account multiproof calculation",
"Account multiproof calculation",
targets = targets.len(),
batch_size = senders.len(),
worker_id,
);
let _span_guard = span.enter();
trace!(
target: "trie::proof_task",
"Processing batched account multiproof"
"Processing account multiproof"
);
let proof_start = Instant::now();
@@ -1603,7 +1120,7 @@ where
tracker.set_precomputed_storage_roots(storage_root_targets_len as u64);
let storage_proof_receivers = match dispatch_storage_proofs(
storage_work_tx,
&storage_work_tx,
&targets,
&mut storage_prefix_sets,
collect_branch_node_masks,
@@ -1611,17 +1128,14 @@ where
) {
Ok(receivers) => receivers,
Err(error) => {
// Send error to all receivers
let error_msg = error.to_string();
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
*account_proofs_processed += 1;
let _ = sender.send(ProofResultMessage {
sequence_number,
result: Err(ParallelStateRootError::Other(error_msg.clone())),
elapsed: start_time.elapsed(),
state,
});
}
// Send error through result channel
error!(target: "trie::proof_task", "Failed to dispatch storage proofs: {error}");
let _ = result_tx.send(ProofResultMessage {
sequence_number: seq,
result: Err(error),
elapsed: start.elapsed(),
state,
});
return;
}
};
@@ -1642,75 +1156,46 @@ where
build_account_multiproof_with_storage_roots(&proof_tx.provider, ctx, &mut tracker);
let proof_elapsed = proof_start.elapsed();
let total_elapsed = start.elapsed();
let proof_cursor_metrics = tracker.cursor_metrics;
proof_cursor_metrics.record_spans();
let stats = tracker.finish();
let result = result.map(|proof| ProofResult::AccountMultiproof { proof, stats });
*account_proofs_processed += 1;
// Send the result to all waiting receivers.
let num_senders = senders.len();
match result {
Ok(proof) => {
// Success case: wrap proof in Arc for efficient sharing across all senders.
let proof_result = ProofResult::AccountMultiproof { proof: Arc::new(proof), stats };
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
*account_proofs_processed += 1;
if sender
.send(ProofResultMessage {
sequence_number,
result: Ok(proof_result.clone()),
elapsed: start_time.elapsed(),
state,
})
.is_err()
{
trace!(
target: "trie::proof_task",
worker_id,
sequence_number,
"Account multiproof receiver dropped, discarding result"
);
}
}
}
Err(error) => {
// Error case: convert to string for cloning, then send to all receivers.
let error_msg = error.to_string();
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
*account_proofs_processed += 1;
if sender
.send(ProofResultMessage {
sequence_number,
result: Err(ParallelStateRootError::Other(error_msg.clone())),
elapsed: start_time.elapsed(),
state,
})
.is_err()
{
trace!(
target: "trie::proof_task",
worker_id,
sequence_number,
"Account multiproof receiver dropped, discarding result"
);
}
}
}
// Send result to MultiProofTask
if result_tx
.send(ProofResultMessage {
sequence_number: seq,
result,
elapsed: total_elapsed,
state,
})
.is_err()
{
trace!(
target: "trie::proof_task",
worker_id,
account_proofs_processed,
"Account multiproof receiver dropped, discarding result"
);
}
trace!(
target: "trie::proof_task",
proof_time_us = proof_elapsed.as_micros(),
num_senders,
total_elapsed_us = total_elapsed.as_micros(),
total_processed = account_proofs_processed,
account_trie_cursor_duration_us = proof_cursor_metrics.account_trie_cursor.total_duration.as_micros(),
account_hashed_cursor_duration_us = proof_cursor_metrics.account_hashed_cursor.total_duration.as_micros(),
storage_trie_cursor_duration_us = proof_cursor_metrics.storage_trie_cursor.total_duration.as_micros(),
storage_hashed_cursor_duration_us = proof_cursor_metrics.storage_hashed_cursor.total_duration.as_micros(),
"Batched account multiproof completed"
account_trie_cursor_metrics = ?proof_cursor_metrics.account_trie_cursor,
account_hashed_cursor_metrics = ?proof_cursor_metrics.account_hashed_cursor,
storage_trie_cursor_metrics = ?proof_cursor_metrics.storage_trie_cursor,
storage_hashed_cursor_metrics = ?proof_cursor_metrics.storage_hashed_cursor,
"Account multiproof completed"
);
#[cfg(feature = "metrics")]
@@ -1853,9 +1338,7 @@ where
drop(_guard);
// Extract storage proof from the result.
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it
// here.
// Extract storage proof from the result
let proof = match proof_msg.result? {
ProofResult::StorageProof { hashed_address: addr, proof } => {
debug_assert_eq!(
@@ -1863,9 +1346,7 @@ where
hashed_address,
"storage worker must return same address: expected {hashed_address}, got {addr}"
);
// Efficiently unwrap Arc: returns inner value if sole owner, clones
// otherwise.
Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone())
proof
}
ProofResult::AccountMultiproof { .. } => {
unreachable!("storage worker only sends StorageProof variant")
@@ -1928,11 +1409,8 @@ where
// Consume remaining storage proof receivers for accounts not encountered during trie walk.
for (hashed_address, receiver) in storage_proof_receivers {
if let Ok(proof_msg) = receiver.recv() {
// Extract storage proof from the result.
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it here.
// Extract storage proof from the result
if let Ok(ProofResult::StorageProof { proof, .. }) = proof_msg.result {
// Efficiently unwrap Arc: returns inner value if sole owner, clones otherwise.
let proof = Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone());
collected_decoded_storages.insert(hashed_address, proof);
}
}

View File

@@ -6,7 +6,7 @@ Gets the content of a database table for the given key
$ op-reth db get mdbx --help
```
```txt
Usage: op-reth db get mdbx [OPTIONS] <TABLE> <KEY> [SUBKEY]
Usage: op-reth db get mdbx [OPTIONS] <TABLE> <KEY> [SUBKEY] [END_KEY] [END_SUBKEY]
Arguments:
<TABLE>
@@ -18,6 +18,12 @@ Arguments:
[SUBKEY]
The subkey to get content for
[END_KEY]
Optional end key for range query (exclusive upper bound)
[END_SUBKEY]
Optional end subkey for range query (exclusive upper bound)
Options:
--raw
Output bytes instead of human-readable decoded value

View File

@@ -6,7 +6,7 @@ Gets the content of a database table for the given key
$ reth db get mdbx --help
```
```txt
Usage: reth db get mdbx [OPTIONS] <TABLE> <KEY> [SUBKEY]
Usage: reth db get mdbx [OPTIONS] <TABLE> <KEY> [SUBKEY] [END_KEY] [END_SUBKEY]
Arguments:
<TABLE>
@@ -18,6 +18,12 @@ Arguments:
[SUBKEY]
The subkey to get content for
[END_KEY]
Optional end key for range query (exclusive upper bound)
[END_SUBKEY]
Optional end subkey for range query (exclusive upper bound)
Options:
--raw
Output bytes instead of human-readable decoded value

View File

@@ -114,6 +114,54 @@ These include general information about the node itself, as well as what protoco
}
```
## `admin_peers`
Returns information about peers currently known to the node.
| Client | Method invocation |
| ------ | ------------------------------ |
| RPC | `{"method": "admin_peers", "params": []}` |
### Example
```js
// > {"jsonrpc":"2.0","id":1,"method":"admin_peers","params":[]}
{"jsonrpc":"2.0","id":1,"result":[
{
"id":"44826a5d6a55f88a18298bca4773fca...",
"name":"reth/v0.0.1/x86_64-unknown-linux-gnu",
"enode":"enode://44826a5d6a55f88a18298bca4773fca5749cdc3a5c9f308aa7d810e9b31123f3e7c5fba0b1d70aac5308426f47df2a128a6747040a3815cc7dd7167d03be320d@192.168.1.1:30303",
"enr":"enr:-IS4QHCYr...",
"caps":["eth/67"],
"network":{
"remoteAddress":"192.168.1.1:30303",
"localAddress":"127.0.0.1:30303",
"inbound":false,
"trusted":false,
"staticNode":false
},
"protocols":{
"eth":{"version":67}
}
}
]}
```
## `admin_clearTxpool`
Clears all transactions from the transaction pool. Returns the number of removed transactions.
| Client | Method invocation |
| ------ | ----------------------------------------- |
| RPC | `{"method": "admin_clearTxpool", "params": []}` |
### Example
```js
// > {"jsonrpc":"2.0","id":1,"method":"admin_clearTxpool","params":[]}
{"jsonrpc":"2.0","id":1,"result":42}
```
## `admin_peerEvents`, `admin_peerEvents_unsubscribe`
Subscribe to events received by peers over the network. This creates a subscription that emits notifications about peer connections and disconnections.

View File

@@ -176,9 +176,13 @@ The second parameter is an array of one or more trace types (`vmTrace`, `trace`,
The third and optional parameter is a block number, block hash, or a block tag (`latest`, `finalized`, `safe`, `earliest`, `pending`).
| Client | Method invocation |
| ------ | -------------------------------------------------------------- |
| RPC | `{"method": "trace_call", "params": [tx, trace[], block]}` |
The fourth and optional parameter is a `stateOverrides` object that temporarily overrides account state used for the trace (balances, nonces, code, storage).
The fifth and optional parameter is a `blockOverrides` object that temporarily overrides block fields used for the trace (for example `timestamp`, `baseFee`, `number`).
| Client | Method invocation |
| ------ | ------------------------------------------------------------------------------------ |
| RPC | `{"method": "trace_call", "params": [tx, trace[], block, stateOverrides, blockOverrides]}` |
### Example

View File

@@ -406,9 +406,10 @@ No pruning, run as archive node.
This configuration will:
- Run pruning every 5 blocks
- Continuously prune all transaction senders, account history and storage history before the block `head-100_000`,
- Continuously prune all transaction senders, account history, storage history and bodies history before the block `head-100_000`,
i.e. keep the data for the last `100_000` blocks
- Prune all receipts before the block 1920000, i.e. keep receipts from the block 1920000
- Keep the last 128 blocks of merkle changesets (default behavior)
```toml
[prune]
@@ -430,6 +431,14 @@ account_history = { distance = 100_000 } # Prune all historical account states b
# Storage History pruning configuration
storage_history = { distance = 100_000 } # Prune all historical storage states before the block `head-100000`
# Bodies History pruning configuration
bodies_history = { distance = 100_000 } # Prune all historical block bodies before the block `head-100000`
# Merkle Changesets pruning configuration
# Controls pruning of AccountsTrieChangeSets and StoragesTrieChangeSets.
# Default: { distance = 128 } - keeps the last 128 blocks of merkle changesets
merkle_changesets = { distance = 128 }
```
We can also prune receipts more granular, using the logs filtering:

View File

@@ -0,0 +1,14 @@
[package]
name = "example-custom-rpc-middleware"
version = "0.0.0"
publish = false
edition.workspace = true
license.workspace = true
[dependencies]
reth-ethereum = { workspace = true, features = ["node", "rpc", "cli"] }
clap = { workspace = true, features = ["derive"] }
jsonrpsee = { workspace = true, features = ["server", "macros"] }
tracing.workspace = true
tower.workspace = true

View File

@@ -0,0 +1,117 @@
//! Example of how to create a node with custom middleware that alters a returned error message from
//! the RPC
//!
//! Run with
//!
//! ```sh
//! cargo run -p example-custom-rpc-middleware node --http --dev --dev.block-time 12s --http.api=debug,eth
//! ```
//!
//! Then make an RPC request that will result in an error
//!
//! ```sh
//! curl -s -X POST http://localhost:8545 \
//! -H "Content-Type: application/json" \
//! -d '{
//! "jsonrpc": "2.0",
//! "method": "debug_getRawBlock",
//! "params": ["2"],
//! "id": 1
//! }' | jq
//! ```
use clap::Parser;
use jsonrpsee::{
core::{
middleware::{Batch, Notification, RpcServiceT},
server::MethodResponse,
},
types::{ErrorObjectOwned, Id, Request},
};
use reth_ethereum::{
cli::{chainspec::EthereumChainSpecParser, interface::Cli},
node::{EthereumAddOns, EthereumNode},
};
use tower::Layer;
fn main() {
Cli::<EthereumChainSpecParser>::parse()
.run(|builder, _| async move {
let handle = builder
.with_types::<EthereumNode>()
.with_components(EthereumNode::components())
.with_add_ons(
//create ethereum addons with our custom rpc middleware
EthereumAddOns::default().with_rpc_middleware(ResponseMutationLayer),
)
.launch_with_debug_capabilities()
.await?;
handle.wait_for_node_exit().await
})
.unwrap();
}
#[derive(Clone)]
pub struct ResponseMutationLayer;
impl<S> Layer<S> for ResponseMutationLayer {
type Service = ResponseMutationService<S>;
fn layer(&self, inner: S) -> Self::Service {
ResponseMutationService { service: inner }
}
}
#[derive(Clone)]
pub struct ResponseMutationService<S> {
service: S,
}
impl<S> RpcServiceT for ResponseMutationService<S>
where
S: RpcServiceT<
MethodResponse = jsonrpsee::MethodResponse,
BatchResponse = jsonrpsee::MethodResponse,
NotificationResponse = jsonrpsee::MethodResponse,
> + Send
+ Sync
+ Clone
+ 'static,
{
type MethodResponse = S::MethodResponse;
type NotificationResponse = S::NotificationResponse;
type BatchResponse = S::BatchResponse;
fn call<'a>(&self, req: Request<'a>) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
tracing::info!("processed call {:?}", req);
let service = self.service.clone();
Box::pin(async move {
let resp = service.call(req).await;
//we can modify the response with our own custom error
if resp.is_error() {
let err = ErrorObjectOwned::owned(
-31404,
"CustomError",
Some("Our very own custom error message"),
);
return MethodResponse::error(Id::Number(1), err);
}
//otherwise just return the original response
resp
})
}
fn batch<'a>(&self, req: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
self.service.batch(req)
}
fn notification<'a>(
&self,
n: Notification<'a>,
) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
self.service.notification(n)
}
}