mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
46 Commits
yk/worker-
...
yk/batch-c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7e80c3cac2 | ||
|
|
6e8d62617d | ||
|
|
866b8ded5f | ||
|
|
51985e249c | ||
|
|
5ac911b707 | ||
|
|
e9a5a11a9f | ||
|
|
7dd14651e4 | ||
|
|
51ef406b94 | ||
|
|
a6b9472d1c | ||
|
|
6636d2a2ad | ||
|
|
ab6854d159 | ||
|
|
5a274fc939 | ||
|
|
c9431b224b | ||
|
|
8cbfd91db0 | ||
|
|
43f9942ba7 | ||
|
|
06adc3ee0c | ||
|
|
fbf6be4cf2 | ||
|
|
21d61d40d1 | ||
|
|
cf7d709358 | ||
|
|
e9355caba5 | ||
|
|
fdd9d5bb40 | ||
|
|
9eeba7e6b3 | ||
|
|
0085acc868 | ||
|
|
c697147f90 | ||
|
|
7388d6636d | ||
|
|
0b859c0735 | ||
|
|
a8e0606fa7 | ||
|
|
969689d9b6 | ||
|
|
ad2081493a | ||
|
|
abfb6d3965 | ||
|
|
0f0eb7a531 | ||
|
|
4f1e486b4f | ||
|
|
05307d088c | ||
|
|
245cca7ce2 | ||
|
|
28d6996fc4 | ||
|
|
0eaffdf489 | ||
|
|
9c141cac4b | ||
|
|
fc6ab35c5c | ||
|
|
f88bf4e427 | ||
|
|
3d330caf36 | ||
|
|
5a43e77771 | ||
|
|
5b3c479ed5 | ||
|
|
dc06b47abe | ||
|
|
e9cd7cc003 | ||
|
|
f633efc969 | ||
|
|
2f55b1c30f |
7
.github/actionlint.yaml
vendored
Normal file
7
.github/actionlint.yaml
vendored
Normal file
@@ -0,0 +1,7 @@
|
||||
self-hosted-runner:
|
||||
labels:
|
||||
- depot-ubuntu-latest
|
||||
- depot-ubuntu-latest-2
|
||||
- depot-ubuntu-latest-4
|
||||
- depot-ubuntu-latest-8
|
||||
- depot-ubuntu-latest-16
|
||||
2
.github/workflows/bench.yml
vendored
2
.github/workflows/bench.yml
vendored
@@ -15,7 +15,7 @@ env:
|
||||
name: bench
|
||||
jobs:
|
||||
codspeed:
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
with:
|
||||
|
||||
2
.github/workflows/book.yml
vendored
2
.github/workflows/book.yml
vendored
@@ -12,7 +12,7 @@ on:
|
||||
|
||||
jobs:
|
||||
build:
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest-16
|
||||
timeout-minutes: 90
|
||||
steps:
|
||||
- name: Checkout
|
||||
|
||||
2
.github/workflows/compact.yml
vendored
2
.github/workflows/compact.yml
vendored
@@ -17,7 +17,7 @@ env:
|
||||
name: compact-codec
|
||||
jobs:
|
||||
compact-codec:
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest-16
|
||||
strategy:
|
||||
matrix:
|
||||
bin:
|
||||
|
||||
2
.github/workflows/e2e.yml
vendored
2
.github/workflows/e2e.yml
vendored
@@ -19,7 +19,7 @@ concurrency:
|
||||
jobs:
|
||||
test:
|
||||
name: e2e-testsuite
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest-16
|
||||
env:
|
||||
RUST_BACKTRACE: 1
|
||||
timeout-minutes: 90
|
||||
|
||||
6
.github/workflows/hive.yml
vendored
6
.github/workflows/hive.yml
vendored
@@ -24,7 +24,7 @@ jobs:
|
||||
prepare-hive:
|
||||
if: github.repository == 'paradigmxyz/reth'
|
||||
timeout-minutes: 45
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
- name: Checkout hive tests
|
||||
@@ -178,7 +178,7 @@ jobs:
|
||||
- prepare-reth
|
||||
- prepare-hive
|
||||
name: run ${{ matrix.scenario.sim }}${{ matrix.scenario.limit && format(' - {0}', matrix.scenario.limit) }}
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest
|
||||
permissions:
|
||||
issues: write
|
||||
steps:
|
||||
@@ -245,7 +245,7 @@ jobs:
|
||||
notify-on-error:
|
||||
needs: test
|
||||
if: failure()
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest
|
||||
steps:
|
||||
- name: Slack Webhook Action
|
||||
uses: rtCamp/action-slack-notify@v2
|
||||
|
||||
2
.github/workflows/integration.yml
vendored
2
.github/workflows/integration.yml
vendored
@@ -23,7 +23,7 @@ jobs:
|
||||
test:
|
||||
name: test / ${{ matrix.network }}
|
||||
if: github.event_name != 'schedule'
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest-16
|
||||
env:
|
||||
RUST_BACKTRACE: 1
|
||||
strategy:
|
||||
|
||||
4
.github/workflows/kurtosis-op.yml
vendored
4
.github/workflows/kurtosis-op.yml
vendored
@@ -32,7 +32,7 @@ jobs:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
name: run kurtosis
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest
|
||||
needs:
|
||||
- prepare-reth
|
||||
steps:
|
||||
@@ -85,7 +85,7 @@ jobs:
|
||||
notify-on-error:
|
||||
needs: test
|
||||
if: failure()
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest
|
||||
steps:
|
||||
- name: Slack Webhook Action
|
||||
uses: rtCamp/action-slack-notify@v2
|
||||
|
||||
4
.github/workflows/kurtosis.yml
vendored
4
.github/workflows/kurtosis.yml
vendored
@@ -30,7 +30,7 @@ jobs:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
name: run kurtosis
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest
|
||||
needs:
|
||||
- prepare-reth
|
||||
steps:
|
||||
@@ -58,7 +58,7 @@ jobs:
|
||||
notify-on-error:
|
||||
needs: test
|
||||
if: failure()
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest
|
||||
steps:
|
||||
- name: Slack Webhook Action
|
||||
uses: rtCamp/action-slack-notify@v2
|
||||
|
||||
40
.github/workflows/lint.yml
vendored
40
.github/workflows/lint.yml
vendored
@@ -12,7 +12,7 @@ env:
|
||||
jobs:
|
||||
clippy-binaries:
|
||||
name: clippy binaries / ${{ matrix.type }}
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest
|
||||
timeout-minutes: 30
|
||||
strategy:
|
||||
matrix:
|
||||
@@ -40,7 +40,7 @@ jobs:
|
||||
|
||||
clippy:
|
||||
name: clippy
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest-16
|
||||
timeout-minutes: 30
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
@@ -56,7 +56,7 @@ jobs:
|
||||
RUSTFLAGS: -D warnings
|
||||
|
||||
wasm:
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest
|
||||
timeout-minutes: 30
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
@@ -75,7 +75,7 @@ jobs:
|
||||
.github/assets/check_wasm.sh
|
||||
|
||||
riscv:
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest
|
||||
timeout-minutes: 60
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
@@ -93,7 +93,7 @@ jobs:
|
||||
|
||||
crate-checks:
|
||||
name: crate-checks (${{ matrix.partition }}/${{ matrix.total_partitions }})
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest-16
|
||||
strategy:
|
||||
matrix:
|
||||
partition: [1, 2]
|
||||
@@ -111,7 +111,7 @@ jobs:
|
||||
|
||||
msrv:
|
||||
name: MSRV
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest
|
||||
timeout-minutes: 30
|
||||
strategy:
|
||||
matrix:
|
||||
@@ -133,7 +133,7 @@ jobs:
|
||||
|
||||
docs:
|
||||
name: docs
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest-16
|
||||
timeout-minutes: 30
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
@@ -150,7 +150,7 @@ jobs:
|
||||
|
||||
fmt:
|
||||
name: fmt
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest
|
||||
timeout-minutes: 30
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
@@ -163,7 +163,7 @@ jobs:
|
||||
|
||||
udeps:
|
||||
name: udeps
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest-16
|
||||
timeout-minutes: 30
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
@@ -177,7 +177,7 @@ jobs:
|
||||
|
||||
book:
|
||||
name: book
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest-16
|
||||
timeout-minutes: 30
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
@@ -232,10 +232,14 @@ jobs:
|
||||
- name: Ensure no arbitrary or proptest dependency on default build
|
||||
run: cargo tree --package reth -e=features,no-dev | grep -Eq "arbitrary|proptest" && exit 1 || exit 0
|
||||
|
||||
# Checks that selected rates can compile with power set of features
|
||||
# Checks that selected crates can compile with power set of features
|
||||
features:
|
||||
name: features
|
||||
runs-on: ubuntu-latest
|
||||
name: features (${{ matrix.partition }}/${{ matrix.total_partitions }})
|
||||
runs-on: depot-ubuntu-latest-16
|
||||
strategy:
|
||||
matrix:
|
||||
partition: [1, 2]
|
||||
total_partitions: [2]
|
||||
timeout-minutes: 30
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
@@ -246,13 +250,19 @@ jobs:
|
||||
cache-on-failure: true
|
||||
- name: cargo install cargo-hack
|
||||
uses: taiki-e/install-action@cargo-hack
|
||||
- run: make check-features
|
||||
- run: |
|
||||
cargo hack check \
|
||||
--package reth-codecs \
|
||||
--package reth-primitives-traits \
|
||||
--package reth-primitives \
|
||||
--feature-powerset \
|
||||
--partition ${{ matrix.partition }}/${{ matrix.total_partitions }}
|
||||
env:
|
||||
RUSTFLAGS: -D warnings
|
||||
|
||||
# Check crates correctly propagate features
|
||||
feature-propagation:
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest
|
||||
timeout-minutes: 20
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
|
||||
2
.github/workflows/prepare-reth.yml
vendored
2
.github/workflows/prepare-reth.yml
vendored
@@ -26,7 +26,7 @@ jobs:
|
||||
prepare-reth:
|
||||
if: github.repository == 'paradigmxyz/reth'
|
||||
timeout-minutes: 45
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
- run: mkdir artifacts
|
||||
|
||||
2
.github/workflows/release-dist.yml
vendored
2
.github/workflows/release-dist.yml
vendored
@@ -12,7 +12,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Update Homebrew formula
|
||||
uses: dawidd6/action-homebrew-bump-formula@v6
|
||||
uses: dawidd6/action-homebrew-bump-formula@v7
|
||||
with:
|
||||
token: ${{ secrets.HOMEBREW }}
|
||||
no_fork: true
|
||||
|
||||
2
.github/workflows/stage.yml
vendored
2
.github/workflows/stage.yml
vendored
@@ -22,7 +22,7 @@ jobs:
|
||||
name: stage-run-test
|
||||
# Only run stage commands test in merge groups
|
||||
if: github.event_name == 'merge_group'
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest
|
||||
env:
|
||||
RUST_LOG: info,sync=error
|
||||
RUST_BACKTRACE: 1
|
||||
|
||||
2
.github/workflows/sync-era.yml
vendored
2
.github/workflows/sync-era.yml
vendored
@@ -17,7 +17,7 @@ concurrency:
|
||||
jobs:
|
||||
sync:
|
||||
name: sync (${{ matrix.chain.bin }})
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest
|
||||
env:
|
||||
RUST_LOG: info,sync=error
|
||||
RUST_BACKTRACE: 1
|
||||
|
||||
2
.github/workflows/sync.yml
vendored
2
.github/workflows/sync.yml
vendored
@@ -17,7 +17,7 @@ concurrency:
|
||||
jobs:
|
||||
sync:
|
||||
name: sync (${{ matrix.chain.bin }})
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest
|
||||
env:
|
||||
RUST_LOG: info,sync=error
|
||||
RUST_BACKTRACE: 1
|
||||
|
||||
6
.github/workflows/unit.yml
vendored
6
.github/workflows/unit.yml
vendored
@@ -19,7 +19,7 @@ concurrency:
|
||||
jobs:
|
||||
test:
|
||||
name: test / ${{ matrix.type }} (${{ matrix.partition }}/${{ matrix.total_partitions }})
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest-16
|
||||
env:
|
||||
RUST_BACKTRACE: 1
|
||||
strategy:
|
||||
@@ -64,7 +64,7 @@ jobs:
|
||||
|
||||
state:
|
||||
name: Ethereum state tests
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest-4
|
||||
env:
|
||||
RUST_LOG: info,sync=error
|
||||
RUST_BACKTRACE: 1
|
||||
@@ -98,7 +98,7 @@ jobs:
|
||||
|
||||
doc:
|
||||
name: doc tests
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: depot-ubuntu-latest-16
|
||||
env:
|
||||
RUST_BACKTRACE: 1
|
||||
timeout-minutes: 30
|
||||
|
||||
4
.github/workflows/windows.yml
vendored
4
.github/workflows/windows.yml
vendored
@@ -11,7 +11,7 @@ on:
|
||||
|
||||
jobs:
|
||||
check-reth:
|
||||
runs-on: ubuntu-24.04
|
||||
runs-on: depot-ubuntu-latest-16
|
||||
timeout-minutes: 60
|
||||
|
||||
steps:
|
||||
@@ -30,7 +30,7 @@ jobs:
|
||||
run: cargo check --target x86_64-pc-windows-gnu
|
||||
|
||||
check-op-reth:
|
||||
runs-on: ubuntu-24.04
|
||||
runs-on: depot-ubuntu-latest-16
|
||||
timeout-minutes: 60
|
||||
|
||||
steps:
|
||||
|
||||
11
Cargo.lock
generated
11
Cargo.lock
generated
@@ -3671,6 +3671,17 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "example-custom-rpc-middleware"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"clap",
|
||||
"jsonrpsee",
|
||||
"reth-ethereum",
|
||||
"tower",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "example-db-access"
|
||||
version = "0.0.0"
|
||||
|
||||
@@ -153,6 +153,7 @@ members = [
|
||||
"examples/custom-node-components/",
|
||||
"examples/custom-payload-builder/",
|
||||
"examples/custom-rlpx-subprotocol",
|
||||
"examples/custom-rpc-middleware",
|
||||
"examples/custom-node",
|
||||
"examples/db-access",
|
||||
"examples/engine-api-access",
|
||||
|
||||
5
Makefile
5
Makefile
@@ -523,8 +523,3 @@ pr:
|
||||
make test
|
||||
|
||||
check-features:
|
||||
cargo hack check \
|
||||
--package reth-codecs \
|
||||
--package reth-primitives-traits \
|
||||
--package reth-primitives \
|
||||
--feature-powerset
|
||||
|
||||
@@ -506,8 +506,8 @@ async fn run_warmup_phase(
|
||||
// Build additional args with conditional --debug.startup-sync-state-idle flag
|
||||
let additional_args = args.build_additional_args("warmup", args.baseline_args.as_ref());
|
||||
|
||||
// Start reth node for warmup
|
||||
let mut node_process =
|
||||
// Start reth node for warmup (command is not stored for warmup phase)
|
||||
let (mut node_process, _warmup_command) =
|
||||
node_manager.start_node(&binary_path, warmup_ref, "warmup", &additional_args).await?;
|
||||
|
||||
// Wait for node to be ready and get its current tip
|
||||
@@ -607,8 +607,8 @@ async fn run_benchmark_workflow(
|
||||
// Build additional args with conditional --debug.startup-sync-state-idle flag
|
||||
let additional_args = args.build_additional_args(ref_type, base_args_str);
|
||||
|
||||
// Start reth node
|
||||
let mut node_process =
|
||||
// Start reth node and capture the command for reporting
|
||||
let (mut node_process, reth_command) =
|
||||
node_manager.start_node(&binary_path, git_ref, ref_type, &additional_args).await?;
|
||||
|
||||
// Wait for node to be ready and get its current tip (wherever it is)
|
||||
@@ -645,8 +645,9 @@ async fn run_benchmark_workflow(
|
||||
// Store results for comparison
|
||||
comparison_generator.add_ref_results(ref_type, &output_dir)?;
|
||||
|
||||
// Set the benchmark run timestamps
|
||||
// Set the benchmark run timestamps and reth command
|
||||
comparison_generator.set_ref_timestamps(ref_type, benchmark_start, benchmark_end)?;
|
||||
comparison_generator.set_ref_command(ref_type, reth_command)?;
|
||||
|
||||
info!("Completed {} reference benchmark", ref_type);
|
||||
}
|
||||
|
||||
@@ -21,6 +21,8 @@ pub(crate) struct ComparisonGenerator {
|
||||
feature_ref_name: String,
|
||||
baseline_results: Option<BenchmarkResults>,
|
||||
feature_results: Option<BenchmarkResults>,
|
||||
baseline_command: Option<String>,
|
||||
feature_command: Option<String>,
|
||||
}
|
||||
|
||||
/// Represents the results from a single benchmark run
|
||||
@@ -57,7 +59,6 @@ pub(crate) struct TotalGasRow {
|
||||
/// - `mean_new_payload_latency_ms`: arithmetic mean latency across blocks.
|
||||
/// - `median_new_payload_latency_ms`: p50 latency across blocks.
|
||||
/// - `p90_new_payload_latency_ms` / `p99_new_payload_latency_ms`: tail latencies across blocks.
|
||||
/// - `std_dev_new_payload_latency_ms`: standard deviation of latency across blocks.
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub(crate) struct BenchmarkSummary {
|
||||
pub total_blocks: u64,
|
||||
@@ -67,7 +68,6 @@ pub(crate) struct BenchmarkSummary {
|
||||
pub median_new_payload_latency_ms: f64,
|
||||
pub p90_new_payload_latency_ms: f64,
|
||||
pub p99_new_payload_latency_ms: f64,
|
||||
pub std_dev_new_payload_latency_ms: f64,
|
||||
pub gas_per_second: f64,
|
||||
pub blocks_per_second: f64,
|
||||
pub min_block_number: u64,
|
||||
@@ -91,6 +91,7 @@ pub(crate) struct RefInfo {
|
||||
pub summary: BenchmarkSummary,
|
||||
pub start_timestamp: Option<DateTime<Utc>>,
|
||||
pub end_timestamp: Option<DateTime<Utc>>,
|
||||
pub reth_command: Option<String>,
|
||||
}
|
||||
|
||||
/// Summary of the comparison between references.
|
||||
@@ -98,7 +99,6 @@ pub(crate) struct RefInfo {
|
||||
/// Percent deltas are `(feature - baseline) / baseline * 100`:
|
||||
/// - `new_payload_latency_p50_change_percent` / p90 / p99: percent changes of the respective
|
||||
/// per-block percentiles.
|
||||
/// - `std_dev_change_percent`: percent change in standard deviation of newPayload latency.
|
||||
/// - `per_block_latency_change_mean_percent` / `per_block_latency_change_median_percent` are the
|
||||
/// mean and median of per-block percent deltas (feature vs baseline), capturing block-level
|
||||
/// drift.
|
||||
@@ -117,7 +117,6 @@ pub(crate) struct ComparisonSummary {
|
||||
pub new_payload_latency_p50_change_percent: f64,
|
||||
pub new_payload_latency_p90_change_percent: f64,
|
||||
pub new_payload_latency_p99_change_percent: f64,
|
||||
pub std_dev_change_percent: f64,
|
||||
pub gas_per_second_change_percent: f64,
|
||||
pub blocks_per_second_change_percent: f64,
|
||||
}
|
||||
@@ -146,6 +145,8 @@ impl ComparisonGenerator {
|
||||
feature_ref_name: args.feature_ref.clone(),
|
||||
baseline_results: None,
|
||||
feature_results: None,
|
||||
baseline_command: None,
|
||||
feature_command: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -210,6 +211,21 @@ impl ComparisonGenerator {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Set the reth command for a reference
|
||||
pub(crate) fn set_ref_command(&mut self, ref_type: &str, command: String) -> Result<()> {
|
||||
match ref_type {
|
||||
"baseline" => {
|
||||
self.baseline_command = Some(command);
|
||||
}
|
||||
"feature" => {
|
||||
self.feature_command = Some(command);
|
||||
}
|
||||
_ => return Err(eyre!("Unknown reference type: {}", ref_type)),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Generate the final comparison report
|
||||
pub(crate) async fn generate_comparison_report(&self) -> Result<()> {
|
||||
info!("Generating comparison report...");
|
||||
@@ -234,12 +250,14 @@ impl ComparisonGenerator {
|
||||
summary: baseline.summary.clone(),
|
||||
start_timestamp: baseline.start_timestamp,
|
||||
end_timestamp: baseline.end_timestamp,
|
||||
reth_command: self.baseline_command.clone(),
|
||||
},
|
||||
feature: RefInfo {
|
||||
ref_name: feature.ref_name.clone(),
|
||||
summary: feature.summary.clone(),
|
||||
start_timestamp: feature.start_timestamp,
|
||||
end_timestamp: feature.end_timestamp,
|
||||
reth_command: self.feature_command.clone(),
|
||||
},
|
||||
comparison_summary,
|
||||
per_block_comparisons,
|
||||
@@ -339,9 +357,6 @@ impl ComparisonGenerator {
|
||||
let mean_new_payload_latency_ms: f64 =
|
||||
latencies_ms.iter().sum::<f64>() / total_blocks as f64;
|
||||
|
||||
let std_dev_new_payload_latency_ms =
|
||||
calculate_std_dev(&latencies_ms, mean_new_payload_latency_ms);
|
||||
|
||||
let mut sorted_latencies_ms = latencies_ms;
|
||||
sorted_latencies_ms.sort_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal));
|
||||
let median_new_payload_latency_ms = percentile(&sorted_latencies_ms, 0.5);
|
||||
@@ -372,7 +387,6 @@ impl ComparisonGenerator {
|
||||
median_new_payload_latency_ms,
|
||||
p90_new_payload_latency_ms,
|
||||
p99_new_payload_latency_ms,
|
||||
std_dev_new_payload_latency_ms,
|
||||
gas_per_second,
|
||||
blocks_per_second,
|
||||
min_block_number,
|
||||
@@ -440,10 +454,6 @@ impl ComparisonGenerator {
|
||||
baseline.p99_new_payload_latency_ms,
|
||||
feature.p99_new_payload_latency_ms,
|
||||
),
|
||||
std_dev_change_percent: calc_percent_change(
|
||||
baseline.std_dev_new_payload_latency_ms,
|
||||
feature.std_dev_new_payload_latency_ms,
|
||||
),
|
||||
gas_per_second_change_percent: calc_percent_change(
|
||||
baseline.gas_per_second,
|
||||
feature.gas_per_second,
|
||||
@@ -574,7 +584,6 @@ impl ComparisonGenerator {
|
||||
" NewPayload Latency p99: {:+.2}%",
|
||||
summary.new_payload_latency_p99_change_percent
|
||||
);
|
||||
println!(" NewPayload Latency std dev: {:+.2}%", summary.std_dev_change_percent);
|
||||
println!(
|
||||
" Gas/Second: {:+.2}%",
|
||||
summary.gas_per_second_change_percent
|
||||
@@ -597,12 +606,11 @@ impl ComparisonGenerator {
|
||||
);
|
||||
println!(" NewPayload latency (ms):");
|
||||
println!(
|
||||
" mean: {:.2}, p50: {:.2}, p90: {:.2}, p99: {:.2}, std dev: {:.2}",
|
||||
" mean: {:.2}, p50: {:.2}, p90: {:.2}, p99: {:.2}",
|
||||
baseline.mean_new_payload_latency_ms,
|
||||
baseline.median_new_payload_latency_ms,
|
||||
baseline.p90_new_payload_latency_ms,
|
||||
baseline.p99_new_payload_latency_ms,
|
||||
baseline.std_dev_new_payload_latency_ms
|
||||
baseline.p99_new_payload_latency_ms
|
||||
);
|
||||
if let (Some(start), Some(end)) =
|
||||
(&report.baseline.start_timestamp, &report.baseline.end_timestamp)
|
||||
@@ -613,6 +621,9 @@ impl ComparisonGenerator {
|
||||
end.format("%Y-%m-%d %H:%M:%S UTC")
|
||||
);
|
||||
}
|
||||
if let Some(ref cmd) = report.baseline.reth_command {
|
||||
println!(" Command: {}", cmd);
|
||||
}
|
||||
println!();
|
||||
|
||||
println!("Feature Summary:");
|
||||
@@ -627,12 +638,11 @@ impl ComparisonGenerator {
|
||||
);
|
||||
println!(" NewPayload latency (ms):");
|
||||
println!(
|
||||
" mean: {:.2}, p50: {:.2}, p90: {:.2}, p99: {:.2}, std dev: {:.2}",
|
||||
" mean: {:.2}, p50: {:.2}, p90: {:.2}, p99: {:.2}",
|
||||
feature.mean_new_payload_latency_ms,
|
||||
feature.median_new_payload_latency_ms,
|
||||
feature.p90_new_payload_latency_ms,
|
||||
feature.p99_new_payload_latency_ms,
|
||||
feature.std_dev_new_payload_latency_ms
|
||||
feature.p99_new_payload_latency_ms
|
||||
);
|
||||
if let (Some(start), Some(end)) =
|
||||
(&report.feature.start_timestamp, &report.feature.end_timestamp)
|
||||
@@ -643,6 +653,9 @@ impl ComparisonGenerator {
|
||||
end.format("%Y-%m-%d %H:%M:%S UTC")
|
||||
);
|
||||
}
|
||||
if let Some(ref cmd) = report.feature.reth_command {
|
||||
println!(" Command: {}", cmd);
|
||||
}
|
||||
println!();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -240,19 +240,24 @@ impl NodeManager {
|
||||
}
|
||||
|
||||
/// Start a reth node using the specified binary path and return the process handle
|
||||
/// along with the formatted reth command string for reporting.
|
||||
pub(crate) async fn start_node(
|
||||
&mut self,
|
||||
binary_path: &std::path::Path,
|
||||
_git_ref: &str,
|
||||
ref_type: &str,
|
||||
additional_args: &[String],
|
||||
) -> Result<tokio::process::Child> {
|
||||
) -> Result<(tokio::process::Child, String)> {
|
||||
// Store the binary path for later use (e.g., in unwind_to_block)
|
||||
self.binary_path = Some(binary_path.to_path_buf());
|
||||
|
||||
let binary_path_str = binary_path.to_string_lossy();
|
||||
let (reth_args, _) = self.build_reth_args(&binary_path_str, additional_args, ref_type);
|
||||
|
||||
// Format the reth command string for reporting
|
||||
let reth_command = shlex::try_join(reth_args.iter().map(|s| s.as_str()))
|
||||
.wrap_err("Failed to format reth command string")?;
|
||||
|
||||
// Log additional arguments if any
|
||||
if !self.additional_reth_args.is_empty() {
|
||||
info!("Using common additional reth arguments: {:?}", self.additional_reth_args);
|
||||
@@ -346,7 +351,7 @@ impl NodeManager {
|
||||
// Give the node a moment to start up
|
||||
sleep(Duration::from_secs(5)).await;
|
||||
|
||||
Ok(child)
|
||||
Ok((child, reth_command))
|
||||
}
|
||||
|
||||
/// Wait for the node to be ready and return its current tip
|
||||
|
||||
@@ -143,5 +143,5 @@ To reproduce the benchmark, first re-set the node to the block that the benchmar
|
||||
- **RPC Configuration**: The RPC endpoints should be accessible and configured correctly, specifically the RPC endpoint must support `eth_getBlockByNumber` and support fetching full transactions. The benchmark will make one RPC query per block as fast as possible, so ensure the RPC endpoint does not rate limit or block requests after a certain volume.
|
||||
- **Reproducibility**: Ensure that the node is at the same state before attempting to retry a benchmark. The `new-payload-fcu` command specifically will commit to the database, so the node must be rolled back using `reth stage unwind` to reproducibly retry benchmarks.
|
||||
- **Profiling tools**: If you are collecting CPU profiles, tools like [`samply`](https://github.com/mstange/samply) and [`perf`](https://perf.wiki.kernel.org/index.php/Main_Page) can be useful for analyzing node performance.
|
||||
- **Benchmark Data**: `reth-bench` additionally contains a `--benchmark.output` flag, which will output gas used benchmarks across the benchmark range in CSV format. This may be useful for further data analysis.
|
||||
- **Benchmark Data**: `reth-bench` additionally contains a `--output` flag, which will output gas used benchmarks across the benchmark range in CSV format. This may be useful for further data analysis.
|
||||
- **Platform Information**: To ensure accurate and reproducible benchmarking, document the platform details, including hardware specifications, OS version, and any other relevant information before publishing any benchmarks.
|
||||
|
||||
@@ -18,7 +18,7 @@ use reth_primitives_traits::{
|
||||
};
|
||||
use reth_storage_api::StateProviderBox;
|
||||
use reth_trie::{updates::TrieUpdatesSorted, HashedPostStateSorted, TrieInputSorted};
|
||||
use std::{collections::BTreeMap, sync::Arc, time::Instant};
|
||||
use std::{collections::BTreeMap, ops::Deref, sync::Arc, time::Instant};
|
||||
use tokio::sync::{broadcast, watch};
|
||||
|
||||
/// Size of the broadcast channel used to notify canonical state events.
|
||||
@@ -634,6 +634,8 @@ impl<N: NodePrimitives> BlockState<N> {
|
||||
/// We assume that the `Receipts` in the executed block `ExecutionOutcome`
|
||||
/// has only one element corresponding to the executed block associated to
|
||||
/// the state.
|
||||
///
|
||||
/// This clones the vector of receipts. To avoid it, use [`Self::executed_block_receipts_ref`].
|
||||
pub fn executed_block_receipts(&self) -> Vec<N::Receipt> {
|
||||
let receipts = self.receipts();
|
||||
|
||||
@@ -646,6 +648,22 @@ impl<N: NodePrimitives> BlockState<N> {
|
||||
receipts.first().cloned().unwrap_or_default()
|
||||
}
|
||||
|
||||
/// Returns a slice of `Receipt` of executed block that determines the state.
|
||||
/// We assume that the `Receipts` in the executed block `ExecutionOutcome`
|
||||
/// has only one element corresponding to the executed block associated to
|
||||
/// the state.
|
||||
pub fn executed_block_receipts_ref(&self) -> &[N::Receipt] {
|
||||
let receipts = self.receipts();
|
||||
|
||||
debug_assert!(
|
||||
receipts.len() <= 1,
|
||||
"Expected at most one block's worth of receipts, found {}",
|
||||
receipts.len()
|
||||
);
|
||||
|
||||
receipts.first().map(|receipts| receipts.deref()).unwrap_or_default()
|
||||
}
|
||||
|
||||
/// Returns a vector of __parent__ `BlockStates`.
|
||||
///
|
||||
/// The block state order in the output vector is newest to oldest (highest to lowest):
|
||||
|
||||
@@ -8,12 +8,17 @@ use reth_db::{
|
||||
RawDupSort,
|
||||
};
|
||||
use reth_db_api::{
|
||||
table::{Decompress, DupSort, Table},
|
||||
tables, RawKey, RawTable, Receipts, TableViewer, Transactions,
|
||||
cursor::{DbCursorRO, DbDupCursorRO},
|
||||
database::Database,
|
||||
table::{Compress, Decompress, DupSort, Table},
|
||||
tables,
|
||||
transaction::DbTx,
|
||||
RawKey, RawTable, Receipts, TableViewer, Transactions,
|
||||
};
|
||||
use reth_db_common::DbTool;
|
||||
use reth_node_api::{HeaderTy, ReceiptTy, TxTy};
|
||||
use reth_node_builder::NodeTypesWithDB;
|
||||
use reth_primitives_traits::ValueWithSubKey;
|
||||
use reth_provider::{providers::ProviderNodeTypes, StaticFileProviderFactory};
|
||||
use reth_static_file_types::StaticFileSegment;
|
||||
use tracing::error;
|
||||
@@ -39,6 +44,14 @@ enum Subcommand {
|
||||
#[arg(value_parser = maybe_json_value_parser)]
|
||||
subkey: Option<String>,
|
||||
|
||||
/// Optional end key for range query (exclusive upper bound)
|
||||
#[arg(value_parser = maybe_json_value_parser)]
|
||||
end_key: Option<String>,
|
||||
|
||||
/// Optional end subkey for range query (exclusive upper bound)
|
||||
#[arg(value_parser = maybe_json_value_parser)]
|
||||
end_subkey: Option<String>,
|
||||
|
||||
/// Output bytes instead of human-readable decoded value
|
||||
#[arg(long)]
|
||||
raw: bool,
|
||||
@@ -61,8 +74,8 @@ impl Command {
|
||||
/// Execute `db get` command
|
||||
pub fn execute<N: ProviderNodeTypes>(self, tool: &DbTool<N>) -> eyre::Result<()> {
|
||||
match self.subcommand {
|
||||
Subcommand::Mdbx { table, key, subkey, raw } => {
|
||||
table.view(&GetValueViewer { tool, key, subkey, raw })?
|
||||
Subcommand::Mdbx { table, key, subkey, end_key, end_subkey, raw } => {
|
||||
table.view(&GetValueViewer { tool, key, subkey, end_key, end_subkey, raw })?
|
||||
}
|
||||
Subcommand::StaticFile { segment, key, raw } => {
|
||||
let (key, mask): (u64, _) = match segment {
|
||||
@@ -154,6 +167,8 @@ struct GetValueViewer<'a, N: NodeTypesWithDB> {
|
||||
tool: &'a DbTool<N>,
|
||||
key: String,
|
||||
subkey: Option<String>,
|
||||
end_key: Option<String>,
|
||||
end_subkey: Option<String>,
|
||||
raw: bool,
|
||||
}
|
||||
|
||||
@@ -163,53 +178,158 @@ impl<N: ProviderNodeTypes> TableViewer<()> for GetValueViewer<'_, N> {
|
||||
fn view<T: Table>(&self) -> Result<(), Self::Error> {
|
||||
let key = table_key::<T>(&self.key)?;
|
||||
|
||||
let content = if self.raw {
|
||||
self.tool
|
||||
.get::<RawTable<T>>(RawKey::from(key))?
|
||||
.map(|content| hex::encode_prefixed(content.raw_value()))
|
||||
} else {
|
||||
self.tool.get::<T>(key)?.as_ref().map(serde_json::to_string_pretty).transpose()?
|
||||
};
|
||||
// A non-dupsort table cannot have subkeys. The `subkey` arg becomes the `end_key`. First we
|
||||
// check that `end_key` and `end_subkey` weren't previously given, as that wouldn't be
|
||||
// valid.
|
||||
if self.end_key.is_some() || self.end_subkey.is_some() {
|
||||
return Err(eyre::eyre!("Only END_KEY can be given for non-DUPSORT tables"));
|
||||
}
|
||||
|
||||
match content {
|
||||
Some(content) => {
|
||||
println!("{content}");
|
||||
}
|
||||
None => {
|
||||
error!(target: "reth::cli", "No content for the given table key.");
|
||||
}
|
||||
};
|
||||
let end_key = self.subkey.clone();
|
||||
|
||||
// Check if we're doing a range query
|
||||
if let Some(ref end_key_str) = end_key {
|
||||
let end_key = table_key::<T>(end_key_str)?;
|
||||
|
||||
// Use walk_range to iterate over the range
|
||||
self.tool.provider_factory.db_ref().view(|tx| {
|
||||
let mut cursor = tx.cursor_read::<T>()?;
|
||||
let walker = cursor.walk_range(key..end_key)?;
|
||||
|
||||
for result in walker {
|
||||
let (k, v) = result?;
|
||||
let json_val = if self.raw {
|
||||
let raw_key = RawKey::from(k);
|
||||
serde_json::json!({
|
||||
"key": hex::encode_prefixed(raw_key.raw_key()),
|
||||
"val": hex::encode_prefixed(v.compress().as_ref()),
|
||||
})
|
||||
} else {
|
||||
serde_json::json!({
|
||||
"key": &k,
|
||||
"val": &v,
|
||||
})
|
||||
};
|
||||
|
||||
println!("{}", serde_json::to_string_pretty(&json_val)?);
|
||||
}
|
||||
|
||||
Ok::<_, eyre::Report>(())
|
||||
})??;
|
||||
} else {
|
||||
// Single key lookup
|
||||
let content = if self.raw {
|
||||
self.tool
|
||||
.get::<RawTable<T>>(RawKey::from(key))?
|
||||
.map(|content| hex::encode_prefixed(content.raw_value()))
|
||||
} else {
|
||||
self.tool.get::<T>(key)?.as_ref().map(serde_json::to_string_pretty).transpose()?
|
||||
};
|
||||
|
||||
match content {
|
||||
Some(content) => {
|
||||
println!("{content}");
|
||||
}
|
||||
None => {
|
||||
error!(target: "reth::cli", "No content for the given table key.");
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn view_dupsort<T: DupSort>(&self) -> Result<(), Self::Error> {
|
||||
fn view_dupsort<T: DupSort>(&self) -> Result<(), Self::Error>
|
||||
where
|
||||
T::Value: reth_primitives_traits::ValueWithSubKey<SubKey = T::SubKey>,
|
||||
{
|
||||
// get a key for given table
|
||||
let key = table_key::<T>(&self.key)?;
|
||||
|
||||
// process dupsort table
|
||||
let subkey = table_subkey::<T>(self.subkey.as_deref())?;
|
||||
|
||||
let content = if self.raw {
|
||||
self.tool
|
||||
.get_dup::<RawDupSort<T>>(RawKey::from(key), RawKey::from(subkey))?
|
||||
.map(|content| hex::encode_prefixed(content.raw_value()))
|
||||
} else {
|
||||
self.tool
|
||||
.get_dup::<T>(key, subkey)?
|
||||
// Check if we're doing a range query
|
||||
if let Some(ref end_key_str) = self.end_key {
|
||||
let end_key = table_key::<T>(end_key_str)?;
|
||||
let start_subkey = table_subkey::<T>(Some(
|
||||
self.subkey.as_ref().expect("must have been given if end_key is given").as_str(),
|
||||
))?;
|
||||
let end_subkey_parsed = self
|
||||
.end_subkey
|
||||
.as_ref()
|
||||
.map(serde_json::to_string_pretty)
|
||||
.transpose()?
|
||||
};
|
||||
.map(|s| table_subkey::<T>(Some(s.as_str())))
|
||||
.transpose()?;
|
||||
|
||||
match content {
|
||||
Some(content) => {
|
||||
println!("{content}");
|
||||
}
|
||||
None => {
|
||||
error!(target: "reth::cli", "No content for the given table subkey.");
|
||||
}
|
||||
};
|
||||
self.tool.provider_factory.db_ref().view(|tx| {
|
||||
let mut cursor = tx.cursor_dup_read::<T>()?;
|
||||
|
||||
// Seek to the starting key. If there is actually a key at the starting key then
|
||||
// seek to the subkey within it.
|
||||
if let Some((decoded_key, _)) = cursor.seek(key.clone())? &&
|
||||
decoded_key == key
|
||||
{
|
||||
cursor.seek_by_key_subkey(key.clone(), start_subkey.clone())?;
|
||||
}
|
||||
|
||||
// Get the current position to start iteration
|
||||
let mut current = cursor.current()?;
|
||||
|
||||
while let Some((decoded_key, decoded_value)) = current {
|
||||
// Extract the subkey using the ValueWithSubKey trait
|
||||
let decoded_subkey = decoded_value.get_subkey();
|
||||
|
||||
// Check if we've reached the end (exclusive)
|
||||
if (&decoded_key, Some(&decoded_subkey)) >=
|
||||
(&end_key, end_subkey_parsed.as_ref())
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
// Output the entry with both key and subkey
|
||||
let json_val = if self.raw {
|
||||
let raw_key = RawKey::from(decoded_key.clone());
|
||||
serde_json::json!({
|
||||
"key": hex::encode_prefixed(raw_key.raw_key()),
|
||||
"val": hex::encode_prefixed(decoded_value.compress().as_ref()),
|
||||
})
|
||||
} else {
|
||||
serde_json::json!({
|
||||
"key": &decoded_key,
|
||||
"val": &decoded_value,
|
||||
})
|
||||
};
|
||||
|
||||
println!("{}", serde_json::to_string_pretty(&json_val)?);
|
||||
|
||||
// Move to next entry
|
||||
current = cursor.next()?;
|
||||
}
|
||||
|
||||
Ok::<_, eyre::Report>(())
|
||||
})??;
|
||||
} else {
|
||||
// Single key/subkey lookup
|
||||
let subkey = table_subkey::<T>(self.subkey.as_deref())?;
|
||||
|
||||
let content = if self.raw {
|
||||
self.tool
|
||||
.get_dup::<RawDupSort<T>>(RawKey::from(key), RawKey::from(subkey))?
|
||||
.map(|content| hex::encode_prefixed(content.raw_value()))
|
||||
} else {
|
||||
self.tool
|
||||
.get_dup::<T>(key, subkey)?
|
||||
.as_ref()
|
||||
.map(serde_json::to_string_pretty)
|
||||
.transpose()?
|
||||
};
|
||||
|
||||
match content {
|
||||
Some(content) => {
|
||||
println!("{content}");
|
||||
}
|
||||
None => {
|
||||
error!(target: "reth::cli", "No content for the given table subkey.");
|
||||
}
|
||||
};
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -110,7 +110,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> InitStateC
|
||||
static_file_provider.commit()?;
|
||||
} else if last_block_number > 0 && last_block_number < header.number() {
|
||||
return Err(eyre::eyre!(
|
||||
"Data directory should be empty when calling init-state with --without-evm-history."
|
||||
"Data directory should be empty when calling init-state with --without-evm."
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -531,8 +531,12 @@ impl PruneConfig {
|
||||
self.segments.receipts.is_some() || !self.segments.receipts_log_filter.is_empty()
|
||||
}
|
||||
|
||||
/// Merges another `PruneConfig` into this one, taking values from the other config if and only
|
||||
/// if the corresponding value in this config is not set.
|
||||
/// Merges values from `other` into `self`.
|
||||
/// - `Option<PruneMode>` fields: set from `other` only if `self` is `None`.
|
||||
/// - `block_interval`: set from `other` only if `self.block_interval ==
|
||||
/// DEFAULT_BLOCK_INTERVAL`.
|
||||
/// - `merkle_changesets`: always set from `other`.
|
||||
/// - `receipts_log_filter`: set from `other` only if `self` is empty and `other` is non-empty.
|
||||
pub fn merge(&mut self, other: Self) {
|
||||
let Self {
|
||||
block_interval,
|
||||
@@ -561,7 +565,7 @@ impl PruneConfig {
|
||||
self.segments.account_history = self.segments.account_history.or(account_history);
|
||||
self.segments.storage_history = self.segments.storage_history.or(storage_history);
|
||||
self.segments.bodies_history = self.segments.bodies_history.or(bodies_history);
|
||||
// Merkle changesets is not optional, so we just replace it if provided
|
||||
// Merkle changesets is not optional; always take the value from `other`
|
||||
self.segments.merkle_changesets = merkle_changesets;
|
||||
|
||||
if self.segments.receipts_log_filter.0.is_empty() && !receipts_log_filter.0.is_empty() {
|
||||
|
||||
@@ -327,7 +327,7 @@ pub fn validate_against_parent_eip1559_base_fee<ChainSpec: EthChainSpec + Ethere
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Validates the timestamp against the parent to make sure it is in the past.
|
||||
/// Validates that the block timestamp is greater than the parent block timestamp.
|
||||
#[inline]
|
||||
pub fn validate_against_parent_timestamp<H: BlockHeader>(
|
||||
header: &H,
|
||||
|
||||
@@ -63,7 +63,7 @@ impl EngineApiMetrics {
|
||||
pub(crate) fn execute_metered<E, DB>(
|
||||
&self,
|
||||
executor: E,
|
||||
transactions: impl Iterator<Item = Result<impl ExecutableTx<E>, BlockExecutionError>>,
|
||||
mut transactions: impl Iterator<Item = Result<impl ExecutableTx<E>, BlockExecutionError>>,
|
||||
state_hook: Box<dyn OnStateHook>,
|
||||
) -> Result<(BlockExecutionOutput<E::Receipt>, Vec<Address>), BlockExecutionError>
|
||||
where
|
||||
@@ -79,27 +79,42 @@ impl EngineApiMetrics {
|
||||
let mut executor = executor.with_state_hook(Some(Box::new(wrapper)));
|
||||
|
||||
let f = || {
|
||||
let start = Instant::now();
|
||||
debug_span!(target: "engine::tree", "pre execution")
|
||||
.entered()
|
||||
.in_scope(|| executor.apply_pre_execution_changes())?;
|
||||
self.executor.pre_execution_histogram.record(start.elapsed());
|
||||
|
||||
let exec_span = debug_span!(target: "engine::tree", "execution").entered();
|
||||
for tx in transactions {
|
||||
loop {
|
||||
let start = Instant::now();
|
||||
let Some(tx) = transactions.next() else { break };
|
||||
self.executor.transaction_wait_histogram.record(start.elapsed());
|
||||
|
||||
let tx = tx?;
|
||||
senders.push(*tx.signer());
|
||||
|
||||
let span =
|
||||
debug_span!(target: "engine::tree", "execute tx", tx_hash=?tx.tx().tx_hash());
|
||||
let enter = span.entered();
|
||||
trace!(target: "engine::tree", "Executing transaction");
|
||||
senders.push(*tx.signer());
|
||||
let start = Instant::now();
|
||||
let gas_used = executor.execute_transaction(tx)?;
|
||||
self.executor.transaction_execution_histogram.record(start.elapsed());
|
||||
|
||||
// record the tx gas used
|
||||
enter.record("gas_used", gas_used);
|
||||
}
|
||||
drop(exec_span);
|
||||
debug_span!(target: "engine::tree", "finish")
|
||||
|
||||
let start = Instant::now();
|
||||
let result = debug_span!(target: "engine::tree", "finish")
|
||||
.entered()
|
||||
.in_scope(|| executor.finish())
|
||||
.map(|(evm, result)| (evm.into_db(), result))
|
||||
.map(|(evm, result)| (evm.into_db(), result));
|
||||
self.executor.post_execution_histogram.record(start.elapsed());
|
||||
|
||||
result
|
||||
};
|
||||
|
||||
// Use metered to execute and track timing/gas metrics
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -252,7 +252,7 @@ where
|
||||
|
||||
/// Extracts block headers and bodies from `iter` and appends them using `writer` and `provider`.
|
||||
///
|
||||
/// Adds on to `total_difficulty` and collects hash to height using `hash_collector`.
|
||||
/// Collects hash to height using `hash_collector`.
|
||||
///
|
||||
/// Skips all blocks below the [`start_bound`] of `block_numbers` and stops when reaching past the
|
||||
/// [`end_bound`] or the end of the file.
|
||||
|
||||
@@ -17,6 +17,14 @@ pub struct ExecutorMetrics {
|
||||
/// The Histogram for amount of gas used.
|
||||
pub gas_used_histogram: Histogram,
|
||||
|
||||
/// The Histogram for amount of time taken to execute the pre-execution changes.
|
||||
pub pre_execution_histogram: Histogram,
|
||||
/// The Histogram for amount of time taken to wait for one transaction to be available.
|
||||
pub transaction_wait_histogram: Histogram,
|
||||
/// The Histogram for amount of time taken to execute one transaction.
|
||||
pub transaction_execution_histogram: Histogram,
|
||||
/// The Histogram for amount of time taken to execute the post-execution changes.
|
||||
pub post_execution_histogram: Histogram,
|
||||
/// The Histogram for amount of time taken to execute blocks.
|
||||
pub execution_histogram: Histogram,
|
||||
/// The total amount of time it took to execute the latest block.
|
||||
|
||||
@@ -95,17 +95,33 @@ pub(super) mod serde_bincode_compat {
|
||||
/// notification: ExExNotification<N>,
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// This enum mirrors [`super::ExExNotification`] but uses borrowed [`Chain`] types
|
||||
/// instead of `Arc<Chain>` for bincode compatibility.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[expect(missing_docs)]
|
||||
#[serde(bound = "")]
|
||||
#[expect(clippy::large_enum_variant)]
|
||||
pub enum ExExNotification<'a, N>
|
||||
where
|
||||
N: NodePrimitives,
|
||||
{
|
||||
ChainCommitted { new: Chain<'a, N> },
|
||||
ChainReorged { old: Chain<'a, N>, new: Chain<'a, N> },
|
||||
ChainReverted { old: Chain<'a, N> },
|
||||
/// Chain got committed without a reorg, and only the new chain is returned.
|
||||
ChainCommitted {
|
||||
/// The new chain after commit.
|
||||
new: Chain<'a, N>,
|
||||
},
|
||||
/// Chain got reorged, and both the old and the new chains are returned.
|
||||
ChainReorged {
|
||||
/// The old chain before reorg.
|
||||
old: Chain<'a, N>,
|
||||
/// The new chain after reorg.
|
||||
new: Chain<'a, N>,
|
||||
},
|
||||
/// Chain got reverted, and only the old chain is returned.
|
||||
ChainReverted {
|
||||
/// The old chain before reversion.
|
||||
old: Chain<'a, N>,
|
||||
},
|
||||
}
|
||||
|
||||
impl<'a, N> From<&'a super::ExExNotification<N>> for ExExNotification<'a, N>
|
||||
|
||||
@@ -131,6 +131,8 @@ pub struct TransactionsManagerMetrics {
|
||||
/// capacity. Note, this is not a limit to the number of inflight requests, but a health
|
||||
/// measure.
|
||||
pub(crate) capacity_pending_pool_imports: Counter,
|
||||
/// The time it took to prepare transactions for import. This is mostly sender recovery.
|
||||
pub(crate) pool_import_prepare_duration: Histogram,
|
||||
|
||||
/* ================ POLL DURATION ================ */
|
||||
|
||||
|
||||
@@ -1338,6 +1338,8 @@ where
|
||||
let Some(peer) = self.peers.get_mut(&peer_id) else { return };
|
||||
let mut transactions = transactions.0;
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
// mark the transactions as received
|
||||
self.transaction_fetcher
|
||||
.remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| tx.tx_hash()));
|
||||
@@ -1459,6 +1461,8 @@ where
|
||||
if num_already_seen_by_peer > 0 {
|
||||
self.report_already_seen(peer_id);
|
||||
}
|
||||
|
||||
self.metrics.pool_import_prepare_duration.record(start.elapsed());
|
||||
}
|
||||
|
||||
/// Processes a [`FetchEvent`].
|
||||
|
||||
@@ -164,7 +164,7 @@ pub use alloy_primitives::{logs_bloom, Log, LogData};
|
||||
pub mod proofs;
|
||||
|
||||
mod storage;
|
||||
pub use storage::StorageEntry;
|
||||
pub use storage::{StorageEntry, ValueWithSubKey};
|
||||
|
||||
pub mod sync;
|
||||
|
||||
|
||||
@@ -1,5 +1,17 @@
|
||||
use alloy_primitives::{B256, U256};
|
||||
|
||||
/// Trait for `DupSort` table values that contain a subkey.
|
||||
///
|
||||
/// This trait allows extracting the subkey from a value during database iteration,
|
||||
/// enabling proper range queries and filtering on `DupSort` tables.
|
||||
pub trait ValueWithSubKey {
|
||||
/// The type of the subkey.
|
||||
type SubKey;
|
||||
|
||||
/// Extract the subkey from the value.
|
||||
fn get_subkey(&self) -> Self::SubKey;
|
||||
}
|
||||
|
||||
/// Account storage entry.
|
||||
///
|
||||
/// `key` is the subkey when used as a value in the `StorageChangeSets` table.
|
||||
@@ -21,6 +33,14 @@ impl StorageEntry {
|
||||
}
|
||||
}
|
||||
|
||||
impl ValueWithSubKey for StorageEntry {
|
||||
type SubKey = B256;
|
||||
|
||||
fn get_subkey(&self) -> Self::SubKey {
|
||||
self.key
|
||||
}
|
||||
}
|
||||
|
||||
impl From<(B256, U256)> for StorageEntry {
|
||||
fn from((key, value): (B256, U256)) -> Self {
|
||||
Self { key, value }
|
||||
|
||||
@@ -107,18 +107,13 @@ impl RpcServiceT for RpcService {
|
||||
|
||||
fn batch<'a>(&self, req: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
|
||||
let entries: Vec<_> = req.into_iter().collect();
|
||||
|
||||
let mut got_notif = false;
|
||||
let mut batch_response = BatchResponseBuilder::new_with_limit(self.max_response_body_size);
|
||||
|
||||
let mut pending_calls: FuturesOrdered<_> = entries
|
||||
.into_iter()
|
||||
.filter_map(|v| match v {
|
||||
Ok(BatchEntry::Call(call)) => Some(Either::Right(self.call(call))),
|
||||
Ok(BatchEntry::Notification(_n)) => {
|
||||
got_notif = true;
|
||||
None
|
||||
}
|
||||
Ok(BatchEntry::Notification(_n)) => None,
|
||||
Err(_err) => Some(Either::Left(async {
|
||||
MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest))
|
||||
})),
|
||||
|
||||
@@ -110,7 +110,8 @@ pub trait EthFees:
|
||||
// increasing and 0 <= p <= 100
|
||||
// Note: The types used ensure that the percentiles are never < 0
|
||||
if let Some(percentiles) = &reward_percentiles &&
|
||||
percentiles.windows(2).any(|w| w[0] > w[1] || w[0] > 100.)
|
||||
(percentiles.iter().any(|p| *p < 0.0 || *p > 100.0) ||
|
||||
percentiles.windows(2).any(|w| w[0] > w[1]))
|
||||
{
|
||||
return Err(EthApiError::InvalidRewardPercentiles.into())
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ use reth_errors::ProviderError;
|
||||
use reth_primitives_traits::{BlockBody, RecoveredBlock, SignedTransaction};
|
||||
use reth_storage_api::{BlockReader, ProviderBlock};
|
||||
use std::sync::Arc;
|
||||
use thiserror::Error;
|
||||
|
||||
/// Returns all matching of a block's receipts when the transaction hashes are known.
|
||||
pub fn matching_block_logs_with_tx_hashes<'a, I, R>(
|
||||
@@ -147,30 +148,40 @@ where
|
||||
|
||||
/// Computes the block range based on the filter range and current block numbers.
|
||||
///
|
||||
/// This returns `(min(best,from), min(best,to))`.
|
||||
/// Returns an error for invalid ranges rather than silently clamping values.
|
||||
pub fn get_filter_block_range(
|
||||
from_block: Option<u64>,
|
||||
to_block: Option<u64>,
|
||||
start_block: u64,
|
||||
info: ChainInfo,
|
||||
) -> (u64, u64) {
|
||||
let mut from_block_number = start_block;
|
||||
let mut to_block_number = info.best_number;
|
||||
) -> Result<(u64, u64), FilterBlockRangeError> {
|
||||
let from_block_number = from_block.unwrap_or(start_block);
|
||||
let to_block_number = to_block.unwrap_or(info.best_number);
|
||||
|
||||
// if a `from_block` argument is provided then the `from_block_number` is the converted value or
|
||||
// the start block if the converted value is larger than the start block, since `from_block`
|
||||
// can't be a future block: `min(head, from_block)`
|
||||
if let Some(filter_from_block) = from_block {
|
||||
from_block_number = start_block.min(filter_from_block)
|
||||
// from > to is an invalid range
|
||||
if from_block_number > to_block_number {
|
||||
return Err(FilterBlockRangeError::InvalidBlockRange);
|
||||
}
|
||||
|
||||
// upper end of the range is the converted `to_block` argument, restricted by the best block:
|
||||
// `min(best_number,to_block_number)`
|
||||
if let Some(filter_to_block) = to_block {
|
||||
to_block_number = info.best_number.min(filter_to_block);
|
||||
// we cannot query blocks that don't exist yet
|
||||
if to_block_number > info.best_number {
|
||||
return Err(FilterBlockRangeError::BlockRangeExceedsHead);
|
||||
}
|
||||
|
||||
(from_block_number, to_block_number)
|
||||
Ok((from_block_number, to_block_number))
|
||||
}
|
||||
|
||||
/// Errors for filter block range validation.
|
||||
///
|
||||
/// See also <https://github.com/ethereum/go-ethereum/blob/master/eth/filters/filter.go#L224-L230>.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Error)]
|
||||
pub enum FilterBlockRangeError {
|
||||
/// `from_block > to_block`
|
||||
#[error("invalid block range params")]
|
||||
InvalidBlockRange,
|
||||
/// Block range extends beyond current head
|
||||
#[error("block range extends beyond current head block")]
|
||||
BlockRangeExceedsHead,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -184,44 +195,73 @@ mod tests {
|
||||
let from = 14000000u64;
|
||||
let to = 14000100u64;
|
||||
let info = ChainInfo { best_number: 15000000, ..Default::default() };
|
||||
let range = get_filter_block_range(Some(from), Some(to), info.best_number, info);
|
||||
let range = get_filter_block_range(Some(from), Some(to), info.best_number, info).unwrap();
|
||||
assert_eq!(range, (from, to));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_log_range_higher() {
|
||||
let from = 15000001u64;
|
||||
let to = 15000002u64;
|
||||
let info = ChainInfo { best_number: 15000000, ..Default::default() };
|
||||
let range = get_filter_block_range(Some(from), Some(to), info.best_number, info);
|
||||
assert_eq!(range, (info.best_number, info.best_number));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_log_range_from() {
|
||||
let from = 14000000u64;
|
||||
let info = ChainInfo { best_number: 15000000, ..Default::default() };
|
||||
let range = get_filter_block_range(Some(from), None, info.best_number, info);
|
||||
let range = get_filter_block_range(Some(from), None, 0, info).unwrap();
|
||||
assert_eq!(range, (from, info.best_number));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_log_range_to() {
|
||||
let to = 14000000u64;
|
||||
let start_block = 0u64;
|
||||
let info = ChainInfo { best_number: 15000000, ..Default::default() };
|
||||
let range = get_filter_block_range(None, Some(to), info.best_number, info);
|
||||
assert_eq!(range, (info.best_number, to));
|
||||
let range = get_filter_block_range(None, Some(to), start_block, info).unwrap();
|
||||
assert_eq!(range, (start_block, to));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_log_range_higher_error() {
|
||||
// Range extends beyond head -> should error instead of clamping
|
||||
let from = 15000001u64;
|
||||
let to = 15000002u64;
|
||||
let info = ChainInfo { best_number: 15000000, ..Default::default() };
|
||||
let err = get_filter_block_range(Some(from), Some(to), info.best_number, info).unwrap_err();
|
||||
assert_eq!(err, FilterBlockRangeError::BlockRangeExceedsHead);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_log_range_to_below_start_error() {
|
||||
// to_block < start_block, default from -> invalid range
|
||||
let to = 14000000u64;
|
||||
let info = ChainInfo { best_number: 15000000, ..Default::default() };
|
||||
let err = get_filter_block_range(None, Some(to), info.best_number, info).unwrap_err();
|
||||
assert_eq!(err, FilterBlockRangeError::InvalidBlockRange);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_log_range_empty() {
|
||||
let info = ChainInfo { best_number: 15000000, ..Default::default() };
|
||||
let range = get_filter_block_range(None, None, info.best_number, info);
|
||||
let range = get_filter_block_range(None, None, info.best_number, info).unwrap();
|
||||
|
||||
// no range given -> head
|
||||
assert_eq!(range, (info.best_number, info.best_number));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_invalid_block_range_error() {
|
||||
let from = 100;
|
||||
let to = 50;
|
||||
let info = ChainInfo { best_number: 150, ..Default::default() };
|
||||
let err = get_filter_block_range(Some(from), Some(to), 0, info).unwrap_err();
|
||||
assert_eq!(err, FilterBlockRangeError::InvalidBlockRange);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_block_range_exceeds_head_error() {
|
||||
let from = 100;
|
||||
let to = 200;
|
||||
let info = ChainInfo { best_number: 150, ..Default::default() };
|
||||
let err = get_filter_block_range(Some(from), Some(to), 0, info).unwrap_err();
|
||||
assert_eq!(err, FilterBlockRangeError::BlockRangeExceedsHead);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_log_from_only() {
|
||||
let s = r#"{"fromBlock":"0xf47a42","address":["0x7de93682b9b5d80d45cd371f7a14f74d49b0914c","0x0f00392fcb466c0e4e4310d81b941e07b4d5a079","0xebf67ab8cff336d3f609127e8bbf8bd6dd93cd81"],"topics":["0x0559884fd3a460db3073b7fc896cc77986f16e378210ded43186175bf646fc5f"]}"#;
|
||||
@@ -242,7 +282,8 @@ mod tests {
|
||||
to_block.and_then(alloy_rpc_types_eth::BlockNumberOrTag::as_number),
|
||||
start_block,
|
||||
info,
|
||||
);
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(from_block_number, 16022082);
|
||||
assert_eq!(to_block_number, best_number);
|
||||
}
|
||||
|
||||
@@ -267,7 +267,7 @@ where
|
||||
.map(|num| self.provider().convert_block_number(num))
|
||||
.transpose()?
|
||||
.flatten();
|
||||
logs_utils::get_filter_block_range(from, to, start_block, info)
|
||||
logs_utils::get_filter_block_range(from, to, start_block, info)?
|
||||
}
|
||||
FilterBlockOption::AtBlockHash(_) => {
|
||||
// blockHash is equivalent to fromBlock = toBlock = the block number with
|
||||
@@ -561,7 +561,7 @@ where
|
||||
}
|
||||
|
||||
let (from_block_number, to_block_number) =
|
||||
logs_utils::get_filter_block_range(from, to, start_block, info);
|
||||
logs_utils::get_filter_block_range(from, to, start_block, info)?;
|
||||
|
||||
self.get_logs_in_block_range(filter, from_block_number, to_block_number, limits)
|
||||
.await
|
||||
@@ -952,6 +952,15 @@ impl From<ProviderError> for EthFilterError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<logs_utils::FilterBlockRangeError> for EthFilterError {
|
||||
fn from(err: logs_utils::FilterBlockRangeError) -> Self {
|
||||
match err {
|
||||
logs_utils::FilterBlockRangeError::InvalidBlockRange => Self::InvalidBlockRangeParams,
|
||||
logs_utils::FilterBlockRangeError::BlockRangeExceedsHead => Self::BlockRangeExceedsHead,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper type for the common pattern of returning receipts, block and the original header that is
|
||||
/// a match for the filter.
|
||||
struct ReceiptBlockResult<P>
|
||||
|
||||
@@ -14,7 +14,9 @@ use reth_network_p2p::headers::{
|
||||
downloader::{HeaderDownloader, HeaderSyncGap, SyncTarget},
|
||||
error::HeadersDownloaderError,
|
||||
};
|
||||
use reth_primitives_traits::{serde_bincode_compat, FullBlockHeader, NodePrimitives, SealedHeader};
|
||||
use reth_primitives_traits::{
|
||||
serde_bincode_compat, FullBlockHeader, HeaderTy, NodePrimitives, SealedHeader,
|
||||
};
|
||||
use reth_provider::{
|
||||
providers::StaticFileWriter, BlockHashReader, DBProvider, HeaderSyncGapProvider,
|
||||
StaticFileProviderFactory,
|
||||
@@ -333,8 +335,9 @@ where
|
||||
(input.unwind_to + 1)..,
|
||||
)?;
|
||||
provider.tx_ref().unwind_table_by_num::<tables::CanonicalHeaders>(input.unwind_to)?;
|
||||
let unfinalized_headers_unwound =
|
||||
provider.tx_ref().unwind_table_by_num::<tables::Headers>(input.unwind_to)?;
|
||||
let unfinalized_headers_unwound = provider.tx_ref().unwind_table_by_num::<tables::Headers<
|
||||
HeaderTy<Provider::Primitives>,
|
||||
>>(input.unwind_to)?;
|
||||
|
||||
// determine how many headers to unwind from the static files based on the highest block and
|
||||
// the unwind_to block
|
||||
|
||||
@@ -94,7 +94,10 @@ pub trait TableViewer<R> {
|
||||
/// Operate on the dupsort table in a generic way.
|
||||
///
|
||||
/// By default, the `view` function is invoked unless overridden.
|
||||
fn view_dupsort<T: DupSort>(&self) -> Result<R, Self::Error> {
|
||||
fn view_dupsort<T: DupSort>(&self) -> Result<R, Self::Error>
|
||||
where
|
||||
T::Value: reth_primitives_traits::ValueWithSubKey<SubKey = T::SubKey>,
|
||||
{
|
||||
self.view::<T>()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use alloy_primitives::Address;
|
||||
use reth_primitives_traits::Account;
|
||||
use reth_primitives_traits::{Account, ValueWithSubKey};
|
||||
|
||||
/// Account as it is saved in the database.
|
||||
///
|
||||
@@ -15,6 +15,14 @@ pub struct AccountBeforeTx {
|
||||
pub info: Option<Account>,
|
||||
}
|
||||
|
||||
impl ValueWithSubKey for AccountBeforeTx {
|
||||
type SubKey = Address;
|
||||
|
||||
fn get_subkey(&self) -> Self::SubKey {
|
||||
self.address
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: Removing reth_codec and manually encode subkey
|
||||
// and compress second part of the value. If we have compression
|
||||
// over whole value (Even SubKey) that would mess up fetching of values with seek_by_key_subkey
|
||||
|
||||
@@ -1046,7 +1046,7 @@ impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
|
||||
id.into(),
|
||||
|provider| provider.receipt(id),
|
||||
|tx_index, _, block_state| {
|
||||
Ok(block_state.executed_block_receipts().get(tx_index).cloned())
|
||||
Ok(block_state.executed_block_receipts_ref().get(tx_index).cloned())
|
||||
},
|
||||
)
|
||||
}
|
||||
@@ -1055,7 +1055,7 @@ impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
|
||||
for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
|
||||
let executed_block = block_state.block_ref();
|
||||
let block = executed_block.recovered_block();
|
||||
let receipts = block_state.executed_block_receipts();
|
||||
let receipts = block_state.executed_block_receipts_ref();
|
||||
|
||||
// assuming 1:1 correspondence between transactions and receipts
|
||||
debug_assert_eq!(
|
||||
|
||||
@@ -66,6 +66,12 @@ impl SenderId {
|
||||
std::ops::Bound::Included(TransactionId::new(self, 0))
|
||||
}
|
||||
|
||||
/// Returns a `Range` for [`TransactionId`] starting with nonce `0` and ending with nonce
|
||||
/// `u64::MAX`
|
||||
pub const fn range(self) -> std::ops::RangeInclusive<TransactionId> {
|
||||
TransactionId::new(self, 0)..=TransactionId::new(self, u64::MAX)
|
||||
}
|
||||
|
||||
/// Converts the sender to a [`TransactionId`] with the given nonce.
|
||||
pub const fn into_transaction_id(self, nonce: u64) -> TransactionId {
|
||||
TransactionId::new(self, nonce)
|
||||
|
||||
@@ -186,11 +186,11 @@ impl<T: ParkedOrd> ParkedPool<T> {
|
||||
{
|
||||
// NOTE: This will not panic due to `!last_sender_transaction.is_empty()`
|
||||
let sender_id = self.last_sender_submission.last().unwrap().sender_id;
|
||||
let list = self.get_txs_by_sender(sender_id);
|
||||
|
||||
// Drop transactions from this sender until the pool is under limits
|
||||
for txid in list.into_iter().rev() {
|
||||
if let Some(tx) = self.remove_transaction(&txid) {
|
||||
while let Some((tx_id, _)) = self.by_id.range(sender_id.range()).next_back() {
|
||||
let tx_id = *tx_id;
|
||||
if let Some(tx) = self.remove_transaction(&tx_id) {
|
||||
removed.push(tx);
|
||||
}
|
||||
|
||||
|
||||
@@ -283,6 +283,16 @@ pub trait TransactionPool: Clone + Debug + Send + Sync {
|
||||
NewSubpoolTransactionStream::new(self.new_transactions_listener(), SubPool::Queued)
|
||||
}
|
||||
|
||||
/// Returns a new Stream that yields new transactions added to the blob sub-pool.
|
||||
///
|
||||
/// This is a convenience wrapper around [`Self::new_transactions_listener`] that filters for
|
||||
/// [`SubPool::Blob`](crate::SubPool).
|
||||
fn new_blob_pool_transactions_listener(
|
||||
&self,
|
||||
) -> NewSubpoolTransactionStream<Self::Transaction> {
|
||||
NewSubpoolTransactionStream::new(self.new_transactions_listener(), SubPool::Blob)
|
||||
}
|
||||
|
||||
/// Returns the _hashes_ of all transactions in the pool that are allowed to be propagated.
|
||||
///
|
||||
/// This excludes hashes that aren't allowed to be propagated.
|
||||
@@ -1583,7 +1593,7 @@ pub struct PoolSize {
|
||||
pub queued_size: usize,
|
||||
/// Number of all transactions of all sub-pools
|
||||
///
|
||||
/// Note: this is the sum of ```pending + basefee + queued```
|
||||
/// Note: this is the sum of ```pending + basefee + queued + blob```
|
||||
pub total: usize,
|
||||
}
|
||||
|
||||
|
||||
@@ -31,6 +31,7 @@ nybbles = { workspace = true, features = ["rlp"] }
|
||||
|
||||
# reth
|
||||
revm-database.workspace = true
|
||||
revm-state.workspace = true
|
||||
|
||||
# `serde` feature
|
||||
serde = { workspace = true, optional = true }
|
||||
@@ -64,7 +65,6 @@ bincode.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
serde_with.workspace = true
|
||||
revm-state.workspace = true
|
||||
|
||||
[features]
|
||||
default = ["std"]
|
||||
|
||||
@@ -83,6 +83,19 @@ impl MultiAddedRemovedKeys {
|
||||
self.storages.entry(address).or_insert_with(default_added_removed_keys);
|
||||
}
|
||||
}
|
||||
|
||||
/// Marks an account as removed.
|
||||
pub fn mark_account_removed(&mut self, account: B256) {
|
||||
self.account.insert_removed(account);
|
||||
}
|
||||
|
||||
/// Marks a storage slot as removed for the given account.
|
||||
pub fn mark_storage_removed(&mut self, hashed_address: B256, slot: B256) {
|
||||
self.storages
|
||||
.entry(hashed_address)
|
||||
.or_insert_with(default_added_removed_keys)
|
||||
.insert_removed(slot);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -184,6 +197,147 @@ mod tests {
|
||||
assert!(!multi_keys.get_accounts().is_removed(&addr));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_record_removals_is_monotonic() {
|
||||
use alloy_primitives::Address;
|
||||
use revm_state::{Account, AccountInfo, AccountStatus, EvmStorageSlot};
|
||||
|
||||
let mut multi_keys = MultiAddedRemovedKeys::new();
|
||||
let address = Address::random();
|
||||
let slot = U256::from(42);
|
||||
let hashed_addr = keccak256(address);
|
||||
let hashed_slot = keccak256(B256::from(slot));
|
||||
|
||||
// Update 1: Create slot with value 100
|
||||
let mut update1 = EvmState::default();
|
||||
update1.insert(
|
||||
address,
|
||||
Account {
|
||||
info: AccountInfo::default(),
|
||||
transaction_id: 0,
|
||||
storage: std::iter::once((
|
||||
slot,
|
||||
EvmStorageSlot::new_changed(U256::ZERO, U256::from(100), 0),
|
||||
))
|
||||
.collect(),
|
||||
status: AccountStatus::Touched,
|
||||
},
|
||||
);
|
||||
multi_keys.record_removals(&update1);
|
||||
|
||||
// Slot should NOT be marked as removed (value is 100, not 0)
|
||||
assert!(
|
||||
multi_keys.get_storage(&hashed_addr).is_none() ||
|
||||
!multi_keys.get_storage(&hashed_addr).unwrap().is_removed(&hashed_slot)
|
||||
);
|
||||
|
||||
// Update 2: Delete slot (set to 0)
|
||||
let mut update2 = EvmState::default();
|
||||
update2.insert(
|
||||
address,
|
||||
Account {
|
||||
info: AccountInfo::default(),
|
||||
transaction_id: 1,
|
||||
storage: std::iter::once((
|
||||
slot,
|
||||
EvmStorageSlot::new_changed(U256::from(100), U256::ZERO, 1),
|
||||
))
|
||||
.collect(),
|
||||
status: AccountStatus::Touched,
|
||||
},
|
||||
);
|
||||
multi_keys.record_removals(&update2);
|
||||
|
||||
// Slot should be marked as removed
|
||||
assert!(multi_keys.get_storage(&hashed_addr).unwrap().is_removed(&hashed_slot));
|
||||
|
||||
// Update 3: Recreate slot with value 200
|
||||
let mut update3 = EvmState::default();
|
||||
update3.insert(
|
||||
address,
|
||||
Account {
|
||||
info: AccountInfo::default(),
|
||||
transaction_id: 2,
|
||||
storage: std::iter::once((
|
||||
slot,
|
||||
EvmStorageSlot::new_changed(U256::ZERO, U256::from(200), 2),
|
||||
))
|
||||
.collect(),
|
||||
status: AccountStatus::Touched,
|
||||
},
|
||||
);
|
||||
multi_keys.record_removals(&update3);
|
||||
|
||||
// KEY ASSERTION: Still removed after recreation!
|
||||
// This is the critical difference from update_with_state.
|
||||
// Removals are monotonic - once removed, stays removed for proof invalidation.
|
||||
assert!(
|
||||
multi_keys.get_storage(&hashed_addr).unwrap().is_removed(&hashed_slot),
|
||||
"slot should remain marked as removed even after recreation"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_record_removals_selfdestruct() {
|
||||
use alloy_primitives::Address;
|
||||
use revm_state::{Account, AccountInfo, AccountStatus};
|
||||
|
||||
let mut multi_keys = MultiAddedRemovedKeys::new();
|
||||
let address = Address::random();
|
||||
let hashed_addr = keccak256(address);
|
||||
|
||||
// Selfdestruct the account (must also be Touched to be processed)
|
||||
let mut update = EvmState::default();
|
||||
update.insert(
|
||||
address,
|
||||
Account {
|
||||
info: AccountInfo::default(),
|
||||
transaction_id: 0,
|
||||
storage: Default::default(),
|
||||
status: AccountStatus::SelfDestructed | AccountStatus::Touched,
|
||||
},
|
||||
);
|
||||
multi_keys.record_removals(&update);
|
||||
|
||||
// Account should be marked as removed
|
||||
assert!(multi_keys.get_accounts().is_removed(&hashed_addr));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_record_removals_ignores_untouched() {
|
||||
use alloy_primitives::Address;
|
||||
use revm_state::{Account, AccountInfo, AccountStatus, EvmStorageSlot};
|
||||
|
||||
let mut multi_keys = MultiAddedRemovedKeys::new();
|
||||
let address = Address::random();
|
||||
let slot = U256::from(1);
|
||||
let hashed_addr = keccak256(address);
|
||||
let hashed_slot = keccak256(B256::from(slot));
|
||||
|
||||
// Create an untouched account with zero storage
|
||||
let mut update = EvmState::default();
|
||||
update.insert(
|
||||
address,
|
||||
Account {
|
||||
info: AccountInfo::default(),
|
||||
transaction_id: 0,
|
||||
storage: std::iter::once((
|
||||
slot,
|
||||
EvmStorageSlot::new_changed(U256::from(100), U256::ZERO, 0),
|
||||
))
|
||||
.collect(),
|
||||
status: AccountStatus::default(), // NOT touched
|
||||
},
|
||||
);
|
||||
multi_keys.record_removals(&update);
|
||||
|
||||
// Should NOT be marked as removed because account wasn't touched
|
||||
assert!(
|
||||
multi_keys.get_storage(&hashed_addr).is_none() ||
|
||||
!multi_keys.get_storage(&hashed_addr).unwrap().is_removed(&hashed_slot)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_update_with_state_account_with_balance() {
|
||||
let mut multi_keys = MultiAddedRemovedKeys::new();
|
||||
|
||||
@@ -61,9 +61,6 @@ impl TrieInput {
|
||||
}
|
||||
|
||||
/// Extend the trie input with the provided blocks, from oldest to newest.
|
||||
///
|
||||
/// For blocks with missing trie updates, the trie input will be extended with prefix sets
|
||||
/// constructed from the state of this block and the state itself, **without** trie updates.
|
||||
pub fn extend_with_blocks<'a>(
|
||||
&mut self,
|
||||
blocks: impl IntoIterator<Item = (&'a HashedPostState, &'a TrieUpdates)>,
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use super::{BranchNodeCompact, StoredNibblesSubKey};
|
||||
use reth_primitives_traits::ValueWithSubKey;
|
||||
|
||||
/// Account storage trie node.
|
||||
///
|
||||
@@ -12,6 +13,14 @@ pub struct StorageTrieEntry {
|
||||
pub node: BranchNodeCompact,
|
||||
}
|
||||
|
||||
impl ValueWithSubKey for StorageTrieEntry {
|
||||
type SubKey = StoredNibblesSubKey;
|
||||
|
||||
fn get_subkey(&self) -> Self::SubKey {
|
||||
self.nibbles.clone()
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: Removing reth_codec and manually encode subkey
|
||||
// and compress second part of the value. If we have compression
|
||||
// over whole value (Even SubKey) that would mess up fetching of values with seek_by_key_subkey
|
||||
@@ -46,6 +55,14 @@ pub struct TrieChangeSetsEntry {
|
||||
pub node: Option<BranchNodeCompact>,
|
||||
}
|
||||
|
||||
impl ValueWithSubKey for TrieChangeSetsEntry {
|
||||
type SubKey = StoredNibblesSubKey;
|
||||
|
||||
fn get_subkey(&self) -> Self::SubKey {
|
||||
self.nibbles.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "reth-codec"))]
|
||||
impl reth_codecs::Compact for TrieChangeSetsEntry {
|
||||
fn to_compact<B>(&self, buf: &mut B) -> usize
|
||||
|
||||
@@ -126,8 +126,7 @@ impl ParallelProof {
|
||||
)))
|
||||
})?;
|
||||
|
||||
// Extract storage proof directly from the result.
|
||||
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it here.
|
||||
// Extract storage proof directly from the result
|
||||
let storage_proof = match proof_msg.result? {
|
||||
crate::proof_task::ProofResult::StorageProof { hashed_address: addr, proof } => {
|
||||
debug_assert_eq!(
|
||||
@@ -135,8 +134,7 @@ impl ParallelProof {
|
||||
hashed_address,
|
||||
"storage worker must return same address: expected {hashed_address}, got {addr}"
|
||||
);
|
||||
// Efficiently unwrap Arc: returns inner value if sole owner, clones otherwise.
|
||||
Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone())
|
||||
proof
|
||||
}
|
||||
crate::proof_task::ProofResult::AccountMultiproof { .. } => {
|
||||
unreachable!("storage worker only sends StorageProof variant")
|
||||
@@ -225,12 +223,8 @@ impl ParallelProof {
|
||||
)
|
||||
})?;
|
||||
|
||||
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it here.
|
||||
let (multiproof, stats) = match proof_result_msg.result? {
|
||||
crate::proof_task::ProofResult::AccountMultiproof { proof, stats } => {
|
||||
// Efficiently unwrap Arc: returns inner value if sole owner, clones otherwise.
|
||||
(Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone()), stats)
|
||||
}
|
||||
crate::proof_task::ProofResult::AccountMultiproof { proof, stats } => (proof, stats),
|
||||
crate::proof_task::ProofResult::StorageProof { .. } => {
|
||||
unreachable!("account worker only sends AccountMultiproof variant")
|
||||
}
|
||||
|
||||
@@ -41,7 +41,6 @@ use alloy_primitives::{
|
||||
use alloy_rlp::{BufMut, Encodable};
|
||||
use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
|
||||
use dashmap::DashMap;
|
||||
use metrics::Histogram;
|
||||
use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind};
|
||||
use reth_provider::{DatabaseProviderROFactory, ProviderError, ProviderResult};
|
||||
use reth_storage_errors::db::DatabaseError;
|
||||
@@ -80,275 +79,6 @@ use crate::proof_task_metrics::{
|
||||
type StorageProofResult = Result<DecodedStorageMultiProof, ParallelStateRootError>;
|
||||
type TrieNodeProviderResult = Result<Option<RevealedNode>, SparseTrieError>;
|
||||
|
||||
/// Maximum number of storage proof jobs to batch together per account.
|
||||
const STORAGE_PROOF_BATCH_LIMIT: usize = 32;
|
||||
|
||||
/// Maximum number of blinded node requests to defer during storage proof batching.
|
||||
/// When this limit is reached, batching stops early to process deferred nodes,
|
||||
/// preventing starvation of blinded node requests under high proof load.
|
||||
const MAX_DEFERRED_BLINDED_NODES: usize = 16;
|
||||
|
||||
/// Holds batched storage proof jobs for the same account.
|
||||
///
|
||||
/// When multiple storage proof requests arrive for the same account, they can be merged
|
||||
/// into a single proof computation with combined prefix sets and target slots.
|
||||
#[derive(Debug)]
|
||||
struct BatchedStorageProof {
|
||||
/// The merged prefix set from all batched jobs.
|
||||
prefix_set: PrefixSetMut,
|
||||
/// The merged target slots from all batched jobs.
|
||||
target_slots: B256Set,
|
||||
/// Whether any job requested branch node masks.
|
||||
with_branch_node_masks: bool,
|
||||
/// The `multi_added_removed_keys` from the first job (they should all share the same `Arc`).
|
||||
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
|
||||
/// All senders that need to receive the result.
|
||||
senders: Vec<ProofResultContext>,
|
||||
}
|
||||
|
||||
impl BatchedStorageProof {
|
||||
/// Creates a new batch from the first storage proof input.
|
||||
fn new(input: StorageProofInput, sender: ProofResultContext) -> Self {
|
||||
// Convert frozen PrefixSet to mutable PrefixSetMut by collecting its keys.
|
||||
let prefix_set = PrefixSetMut::from(input.prefix_set.iter().copied());
|
||||
Self {
|
||||
prefix_set,
|
||||
target_slots: input.target_slots,
|
||||
with_branch_node_masks: input.with_branch_node_masks,
|
||||
multi_added_removed_keys: input.multi_added_removed_keys,
|
||||
senders: vec![sender],
|
||||
}
|
||||
}
|
||||
|
||||
/// Merges another storage proof job into this batch.
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if `input.multi_added_removed_keys` does not point to the same Arc as the batch's.
|
||||
/// This is a critical invariant for proof correctness.
|
||||
fn merge(&mut self, input: StorageProofInput, sender: ProofResultContext) {
|
||||
// Validate that all batched jobs share the same multi_added_removed_keys Arc.
|
||||
// This is a critical invariant: if jobs have different keys, the merged proof
|
||||
// would be computed with only the first job's keys, producing incorrect results.
|
||||
// Using assert! (not debug_assert!) because incorrect proofs could cause consensus
|
||||
// failures.
|
||||
assert!(
|
||||
match (&self.multi_added_removed_keys, &input.multi_added_removed_keys) {
|
||||
(Some(a), Some(b)) => Arc::ptr_eq(a, b),
|
||||
(None, None) => true,
|
||||
_ => false,
|
||||
},
|
||||
"All batched storage proof jobs must share the same multi_added_removed_keys Arc"
|
||||
);
|
||||
|
||||
self.prefix_set.extend_keys(input.prefix_set.iter().copied());
|
||||
self.target_slots.extend(input.target_slots);
|
||||
self.with_branch_node_masks |= input.with_branch_node_masks;
|
||||
self.senders.push(sender);
|
||||
}
|
||||
|
||||
/// Converts this batch into a single `StorageProofInput` for computation.
|
||||
fn into_input(self, hashed_address: B256) -> (StorageProofInput, Vec<ProofResultContext>) {
|
||||
let input = StorageProofInput {
|
||||
hashed_address,
|
||||
prefix_set: self.prefix_set.freeze(),
|
||||
target_slots: self.target_slots,
|
||||
with_branch_node_masks: self.with_branch_node_masks,
|
||||
multi_added_removed_keys: self.multi_added_removed_keys,
|
||||
};
|
||||
(input, self.senders)
|
||||
}
|
||||
}
|
||||
|
||||
/// Metrics for storage worker batching.
|
||||
#[derive(Clone, Default)]
|
||||
struct StorageWorkerBatchMetrics {
|
||||
/// Histogram of batch sizes (number of jobs merged per computation).
|
||||
#[cfg(feature = "metrics")]
|
||||
batch_size_histogram: Option<Histogram>,
|
||||
}
|
||||
|
||||
impl StorageWorkerBatchMetrics {
|
||||
#[cfg(feature = "metrics")]
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
batch_size_histogram: Some(metrics::histogram!(
|
||||
"trie.proof_task.storage_worker_batch_size"
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "metrics"))]
|
||||
fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
|
||||
fn record_batch_size(&self, _size: usize) {
|
||||
#[cfg(feature = "metrics")]
|
||||
if let Some(h) = &self.batch_size_histogram {
|
||||
h.record(_size as f64);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Maximum number of account multiproof jobs to batch together.
|
||||
const ACCOUNT_PROOF_BATCH_LIMIT: usize = 32;
|
||||
|
||||
/// Holds batched account multiproof jobs.
|
||||
///
|
||||
/// When multiple account multiproof requests arrive, they can be merged
|
||||
/// into a single proof computation with combined targets and prefix sets.
|
||||
#[derive(Debug)]
|
||||
struct BatchedAccountProof {
|
||||
/// The merged targets from all batched jobs.
|
||||
targets: MultiProofTargets,
|
||||
/// The merged account prefix set from all batched jobs.
|
||||
account_prefix_set: PrefixSetMut,
|
||||
/// The merged storage prefix sets from all batched jobs.
|
||||
storage_prefix_sets: B256Map<PrefixSetMut>,
|
||||
/// The merged destroyed accounts from all batched jobs.
|
||||
destroyed_accounts: B256Set,
|
||||
/// Whether any job requested branch node masks.
|
||||
collect_branch_node_masks: bool,
|
||||
/// The `multi_added_removed_keys` from the first job (they should all share the same `Arc`).
|
||||
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
|
||||
/// The shared `missed_leaves_storage_roots` cache from the first job.
|
||||
missed_leaves_storage_roots: Arc<DashMap<B256, B256>>,
|
||||
/// All senders that need to receive the result.
|
||||
senders: Vec<ProofResultContext>,
|
||||
}
|
||||
|
||||
impl BatchedAccountProof {
|
||||
/// Creates a new batch from the first account multiproof input.
|
||||
fn new(input: AccountMultiproofInput) -> Self {
|
||||
// Convert frozen prefix sets to mutable versions.
|
||||
let account_prefix_set =
|
||||
PrefixSetMut::from(input.prefix_sets.account_prefix_set.iter().copied());
|
||||
let storage_prefix_sets = input
|
||||
.prefix_sets
|
||||
.storage_prefix_sets
|
||||
.into_iter()
|
||||
.map(|(addr, ps)| (addr, PrefixSetMut::from(ps.iter().copied())))
|
||||
.collect();
|
||||
let destroyed_accounts = input.prefix_sets.destroyed_accounts;
|
||||
|
||||
Self {
|
||||
targets: input.targets,
|
||||
account_prefix_set,
|
||||
storage_prefix_sets,
|
||||
destroyed_accounts,
|
||||
collect_branch_node_masks: input.collect_branch_node_masks,
|
||||
multi_added_removed_keys: input.multi_added_removed_keys,
|
||||
missed_leaves_storage_roots: input.missed_leaves_storage_roots,
|
||||
senders: vec![input.proof_result_sender],
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to merge another account multiproof job into this batch.
|
||||
///
|
||||
/// Returns the job back if caches are incompatible so the caller can process it separately.
|
||||
fn try_merge(&mut self, input: AccountMultiproofInput) -> Result<(), AccountMultiproofInput> {
|
||||
// Require all jobs to share the same caches; otherwise merging would produce
|
||||
// incorrect proofs by reusing the wrong retained keys or missed-leaf storage roots.
|
||||
let multi_added_removed_keys_mismatch =
|
||||
!match (&self.multi_added_removed_keys, &input.multi_added_removed_keys) {
|
||||
(Some(a), Some(b)) => Arc::ptr_eq(a, b),
|
||||
(None, None) => true,
|
||||
_ => false,
|
||||
};
|
||||
|
||||
if multi_added_removed_keys_mismatch ||
|
||||
!Arc::ptr_eq(&self.missed_leaves_storage_roots, &input.missed_leaves_storage_roots)
|
||||
{
|
||||
return Err(input);
|
||||
}
|
||||
|
||||
// Merge targets.
|
||||
self.targets.extend(input.targets);
|
||||
|
||||
// Merge account prefix set.
|
||||
self.account_prefix_set.extend_keys(input.prefix_sets.account_prefix_set.iter().copied());
|
||||
|
||||
// Merge storage prefix sets.
|
||||
for (addr, ps) in input.prefix_sets.storage_prefix_sets {
|
||||
match self.storage_prefix_sets.entry(addr) {
|
||||
alloy_primitives::map::Entry::Occupied(mut entry) => {
|
||||
entry.get_mut().extend_keys(ps.iter().copied());
|
||||
}
|
||||
alloy_primitives::map::Entry::Vacant(entry) => {
|
||||
entry.insert(PrefixSetMut::from(ps.iter().copied()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Merge destroyed accounts.
|
||||
self.destroyed_accounts.extend(input.prefix_sets.destroyed_accounts);
|
||||
|
||||
// OR the branch node masks flag.
|
||||
self.collect_branch_node_masks |= input.collect_branch_node_masks;
|
||||
|
||||
// Collect the sender.
|
||||
self.senders.push(input.proof_result_sender);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Converts this batch into a single `AccountMultiproofInput` for computation.
|
||||
fn into_input(self) -> (AccountMultiproofInput, Vec<ProofResultContext>) {
|
||||
// Freeze the mutable prefix sets.
|
||||
let storage_prefix_sets: B256Map<PrefixSet> =
|
||||
self.storage_prefix_sets.into_iter().map(|(addr, ps)| (addr, ps.freeze())).collect();
|
||||
|
||||
let prefix_sets = TriePrefixSets {
|
||||
account_prefix_set: self.account_prefix_set.freeze(),
|
||||
storage_prefix_sets,
|
||||
destroyed_accounts: self.destroyed_accounts,
|
||||
};
|
||||
|
||||
// Use a dummy sender for the input since we'll handle all senders separately.
|
||||
let dummy_sender = self.senders.first().expect("batch always has at least one sender");
|
||||
let input = AccountMultiproofInput {
|
||||
targets: self.targets,
|
||||
prefix_sets,
|
||||
collect_branch_node_masks: self.collect_branch_node_masks,
|
||||
multi_added_removed_keys: self.multi_added_removed_keys,
|
||||
missed_leaves_storage_roots: self.missed_leaves_storage_roots,
|
||||
proof_result_sender: dummy_sender.clone(),
|
||||
};
|
||||
(input, self.senders)
|
||||
}
|
||||
}
|
||||
|
||||
/// Metrics for account worker batching.
|
||||
#[derive(Clone, Default)]
|
||||
struct AccountWorkerBatchMetrics {
|
||||
/// Histogram of batch sizes (number of jobs merged per computation).
|
||||
#[cfg(feature = "metrics")]
|
||||
batch_size_histogram: Option<Histogram>,
|
||||
}
|
||||
|
||||
impl AccountWorkerBatchMetrics {
|
||||
#[cfg(feature = "metrics")]
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
batch_size_histogram: Some(metrics::histogram!(
|
||||
"trie.proof_task.account_worker_batch_size"
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "metrics"))]
|
||||
fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
|
||||
fn record_batch_size(&self, _size: usize) {
|
||||
#[cfg(feature = "metrics")]
|
||||
if let Some(h) = &self.batch_size_histogram {
|
||||
h.record(_size as f64);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A handle that provides type-safe access to proof worker pools.
|
||||
///
|
||||
/// The handle stores direct senders to both storage and account worker pools,
|
||||
@@ -822,16 +552,12 @@ impl TrieNodeProvider for ProofTaskTrieNodeProvider {
|
||||
}
|
||||
}
|
||||
/// Result of a proof calculation, which can be either an account multiproof or a storage proof.
|
||||
///
|
||||
/// The proof data is wrapped in `Arc` to enable efficient sharing when batching multiple
|
||||
/// proof requests. This avoids expensive cloning of the underlying proof structures
|
||||
/// when sending results to multiple receivers.
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug)]
|
||||
pub enum ProofResult {
|
||||
/// Account multiproof with statistics
|
||||
AccountMultiproof {
|
||||
/// The account multiproof (Arc-wrapped for efficient sharing in batches)
|
||||
proof: Arc<DecodedMultiProof>,
|
||||
/// The account multiproof
|
||||
proof: DecodedMultiProof,
|
||||
/// Statistics collected during proof computation
|
||||
stats: ParallelTrieStats,
|
||||
},
|
||||
@@ -839,8 +565,8 @@ pub enum ProofResult {
|
||||
StorageProof {
|
||||
/// The hashed address this storage proof belongs to
|
||||
hashed_address: B256,
|
||||
/// The storage multiproof (Arc-wrapped for efficient sharing in batches)
|
||||
proof: Arc<DecodedStorageMultiProof>,
|
||||
/// The storage multiproof
|
||||
proof: DecodedStorageMultiProof,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -849,17 +575,11 @@ impl ProofResult {
|
||||
///
|
||||
/// For account multiproofs, returns the multiproof directly (discarding stats).
|
||||
/// For storage proofs, wraps the storage proof into a minimal multiproof.
|
||||
///
|
||||
/// Note: This method clones the inner proof data. If you need to avoid the clone
|
||||
/// when you're the sole owner, consider using `Arc::try_unwrap` first.
|
||||
pub fn into_multiproof(self) -> DecodedMultiProof {
|
||||
match self {
|
||||
Self::AccountMultiproof { proof, stats: _ } => {
|
||||
Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone())
|
||||
}
|
||||
Self::AccountMultiproof { proof, stats: _ } => proof,
|
||||
Self::StorageProof { hashed_address, proof } => {
|
||||
let storage_proof = Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone());
|
||||
DecodedMultiProof::from_storage_proof(hashed_address, storage_proof)
|
||||
DecodedMultiProof::from_storage_proof(hashed_address, proof)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -988,18 +708,11 @@ where
|
||||
/// 2. Advertises availability
|
||||
/// 3. Processes jobs in a loop:
|
||||
/// - Receives job from channel
|
||||
/// - Drains additional same-account storage proof jobs (batching)
|
||||
/// - Marks worker as busy
|
||||
/// - Processes the batched jobs as a single proof computation
|
||||
/// - Processes the job
|
||||
/// - Marks worker as available
|
||||
/// 4. Shuts down when channel closes
|
||||
///
|
||||
/// # Batching Strategy
|
||||
///
|
||||
/// When multiple storage proof requests arrive for the same account, they are merged
|
||||
/// into a single proof computation. This reduces redundant trie traversals when state
|
||||
/// updates arrive faster than proof computation can process them.
|
||||
///
|
||||
/// # Panic Safety
|
||||
///
|
||||
/// If this function panics, the worker thread terminates but other workers
|
||||
@@ -1019,7 +732,6 @@ where
|
||||
// Create provider from factory
|
||||
let provider = task_ctx.factory.database_provider_ro()?;
|
||||
let proof_tx = ProofTaskTx::new(provider, worker_id);
|
||||
let batch_metrics = StorageWorkerBatchMetrics::new();
|
||||
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
@@ -1034,104 +746,20 @@ where
|
||||
// Initially mark this worker as available.
|
||||
available_workers.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// Deferred blinded node jobs to process after batched storage proofs.
|
||||
// Pre-allocate with capacity to avoid reallocations during batching.
|
||||
let mut deferred_blinded_nodes: Vec<(B256, Nibbles, Sender<TrieNodeProviderResult>)> =
|
||||
Vec::with_capacity(MAX_DEFERRED_BLINDED_NODES);
|
||||
|
||||
while let Ok(job) = work_rx.recv() {
|
||||
// Mark worker as busy.
|
||||
available_workers.fetch_sub(1, Ordering::Relaxed);
|
||||
|
||||
match job {
|
||||
StorageWorkerJob::StorageProof { input, proof_result_sender } => {
|
||||
// Start batching: group storage proofs by account.
|
||||
let mut batches: B256Map<BatchedStorageProof> = B256Map::default();
|
||||
batches.insert(
|
||||
input.hashed_address,
|
||||
BatchedStorageProof::new(input, proof_result_sender),
|
||||
Self::process_storage_proof(
|
||||
worker_id,
|
||||
&proof_tx,
|
||||
input,
|
||||
proof_result_sender,
|
||||
&mut storage_proofs_processed,
|
||||
&mut cursor_metrics_cache,
|
||||
);
|
||||
let mut total_jobs = 1usize;
|
||||
|
||||
// Drain additional jobs from the queue.
|
||||
while total_jobs < STORAGE_PROOF_BATCH_LIMIT {
|
||||
match work_rx.try_recv() {
|
||||
Ok(StorageWorkerJob::StorageProof {
|
||||
input: next_input,
|
||||
proof_result_sender: next_sender,
|
||||
}) => {
|
||||
total_jobs += 1;
|
||||
let addr = next_input.hashed_address;
|
||||
match batches.entry(addr) {
|
||||
alloy_primitives::map::Entry::Occupied(mut entry) => {
|
||||
entry.get_mut().merge(next_input, next_sender);
|
||||
}
|
||||
alloy_primitives::map::Entry::Vacant(entry) => {
|
||||
entry.insert(BatchedStorageProof::new(
|
||||
next_input,
|
||||
next_sender,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(StorageWorkerJob::BlindedStorageNode {
|
||||
account,
|
||||
path,
|
||||
result_sender,
|
||||
}) => {
|
||||
// Defer blinded node jobs to process after batched proofs.
|
||||
deferred_blinded_nodes.push((account, path, result_sender));
|
||||
// Stop batching if too many blinded nodes are deferred to prevent
|
||||
// starvation.
|
||||
if deferred_blinded_nodes.len() >= MAX_DEFERRED_BLINDED_NODES {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
|
||||
// Process all batched storage proofs.
|
||||
for (hashed_address, batch) in batches {
|
||||
let batch_size = batch.senders.len();
|
||||
batch_metrics.record_batch_size(batch_size);
|
||||
|
||||
let (merged_input, senders) = batch.into_input(hashed_address);
|
||||
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
?hashed_address,
|
||||
batch_size,
|
||||
prefix_set_len = merged_input.prefix_set.len(),
|
||||
target_slots_len = merged_input.target_slots.len(),
|
||||
"Processing batched storage proof"
|
||||
);
|
||||
|
||||
Self::process_batched_storage_proof(
|
||||
worker_id,
|
||||
&proof_tx,
|
||||
hashed_address,
|
||||
merged_input,
|
||||
senders,
|
||||
&mut storage_proofs_processed,
|
||||
&mut cursor_metrics_cache,
|
||||
);
|
||||
}
|
||||
|
||||
// Process any deferred blinded node jobs.
|
||||
for (account, path, result_sender) in
|
||||
std::mem::take(&mut deferred_blinded_nodes)
|
||||
{
|
||||
Self::process_blinded_node(
|
||||
worker_id,
|
||||
&proof_tx,
|
||||
account,
|
||||
path,
|
||||
result_sender,
|
||||
&mut storage_nodes_processed,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
StorageWorkerJob::BlindedStorageNode { account, path, result_sender } => {
|
||||
@@ -1167,103 +795,82 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Processes a batched storage proof request and sends results to all waiting receivers.
|
||||
///
|
||||
/// This computes a single storage proof with merged targets and sends the same result
|
||||
/// to all original requestors, reducing redundant trie traversals.
|
||||
fn process_batched_storage_proof<Provider>(
|
||||
/// Processes a storage proof request.
|
||||
fn process_storage_proof<Provider>(
|
||||
worker_id: usize,
|
||||
proof_tx: &ProofTaskTx<Provider>,
|
||||
hashed_address: B256,
|
||||
input: StorageProofInput,
|
||||
senders: Vec<ProofResultContext>,
|
||||
proof_result_sender: ProofResultContext,
|
||||
storage_proofs_processed: &mut u64,
|
||||
cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
|
||||
) where
|
||||
Provider: TrieCursorFactory + HashedCursorFactory,
|
||||
{
|
||||
let hashed_address = input.hashed_address;
|
||||
let ProofResultContext { sender, sequence_number: seq, state, start_time } =
|
||||
proof_result_sender;
|
||||
|
||||
let mut trie_cursor_metrics = TrieCursorMetricsCache::default();
|
||||
let mut hashed_cursor_metrics = HashedCursorMetricsCache::default();
|
||||
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
hashed_address = ?hashed_address,
|
||||
prefix_set_len = input.prefix_set.len(),
|
||||
target_slots_len = input.target_slots.len(),
|
||||
"Processing storage proof"
|
||||
);
|
||||
|
||||
let proof_start = Instant::now();
|
||||
let result = proof_tx.compute_storage_proof(
|
||||
input,
|
||||
&mut trie_cursor_metrics,
|
||||
&mut hashed_cursor_metrics,
|
||||
);
|
||||
|
||||
let proof_elapsed = proof_start.elapsed();
|
||||
*storage_proofs_processed += 1;
|
||||
|
||||
// Send the result to all waiting receivers.
|
||||
let num_senders = senders.len();
|
||||
match result {
|
||||
Ok(storage_proof) => {
|
||||
// Success case: wrap proof in Arc for efficient sharing across all senders.
|
||||
let proof_result =
|
||||
ProofResult::StorageProof { hashed_address, proof: Arc::new(storage_proof) };
|
||||
let result_msg = result.map(|storage_proof| ProofResult::StorageProof {
|
||||
hashed_address,
|
||||
proof: storage_proof,
|
||||
});
|
||||
|
||||
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
|
||||
*storage_proofs_processed += 1;
|
||||
|
||||
if sender
|
||||
.send(ProofResultMessage {
|
||||
sequence_number,
|
||||
result: Ok(proof_result.clone()),
|
||||
elapsed: start_time.elapsed(),
|
||||
state,
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
?hashed_address,
|
||||
sequence_number,
|
||||
"Proof result receiver dropped, discarding result"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
// Error case: convert to string for cloning, then send to all receivers.
|
||||
let error_msg = error.to_string();
|
||||
|
||||
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
|
||||
*storage_proofs_processed += 1;
|
||||
|
||||
if sender
|
||||
.send(ProofResultMessage {
|
||||
sequence_number,
|
||||
result: Err(ParallelStateRootError::Other(error_msg.clone())),
|
||||
elapsed: start_time.elapsed(),
|
||||
state,
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
?hashed_address,
|
||||
sequence_number,
|
||||
"Proof result receiver dropped, discarding result"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
if sender
|
||||
.send(ProofResultMessage {
|
||||
sequence_number: seq,
|
||||
result: result_msg,
|
||||
elapsed: start_time.elapsed(),
|
||||
state,
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
hashed_address = ?hashed_address,
|
||||
storage_proofs_processed,
|
||||
"Proof result receiver dropped, discarding result"
|
||||
);
|
||||
}
|
||||
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
?hashed_address,
|
||||
hashed_address = ?hashed_address,
|
||||
proof_time_us = proof_elapsed.as_micros(),
|
||||
num_senders,
|
||||
total_processed = storage_proofs_processed,
|
||||
trie_cursor_duration_us = trie_cursor_metrics.total_duration.as_micros(),
|
||||
hashed_cursor_duration_us = hashed_cursor_metrics.total_duration.as_micros(),
|
||||
"Batched storage proof completed"
|
||||
?trie_cursor_metrics,
|
||||
?hashed_cursor_metrics,
|
||||
"Storage proof completed"
|
||||
);
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
{
|
||||
// Accumulate per-proof metrics into the worker's cache
|
||||
let per_proof_cache = ProofTaskCursorMetricsCache {
|
||||
account_trie_cursor: TrieCursorMetricsCache::default(),
|
||||
account_hashed_cursor: HashedCursorMetricsCache::default(),
|
||||
@@ -1380,18 +987,11 @@ where
|
||||
/// 2. Advertises availability
|
||||
/// 3. Processes jobs in a loop:
|
||||
/// - Receives job from channel
|
||||
/// - Drains additional account multiproof jobs (batching)
|
||||
/// - Marks worker as busy
|
||||
/// - Processes the batched jobs as a single proof computation
|
||||
/// - Processes the job
|
||||
/// - Marks worker as available
|
||||
/// 4. Shuts down when channel closes
|
||||
///
|
||||
/// # Batching Strategy
|
||||
///
|
||||
/// When multiple account multiproof requests arrive, they are merged into
|
||||
/// a single proof computation. This reduces redundant trie traversals when
|
||||
/// state updates arrive faster than proof computation can process them.
|
||||
///
|
||||
/// # Panic Safety
|
||||
///
|
||||
/// If this function panics, the worker thread terminates but other workers
|
||||
@@ -1412,7 +1012,6 @@ where
|
||||
// Create provider from factory
|
||||
let provider = task_ctx.factory.database_provider_ro()?;
|
||||
let proof_tx = ProofTaskTx::new(provider, worker_id);
|
||||
let batch_metrics = AccountWorkerBatchMetrics::new();
|
||||
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
@@ -1427,98 +1026,20 @@ where
|
||||
// Count this worker as available only after successful initialization.
|
||||
available_workers.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// Deferred blinded node jobs to process after batched account proofs.
|
||||
// Pre-allocate with capacity to avoid reallocations during batching.
|
||||
let mut deferred_blinded_nodes: Vec<(Nibbles, Sender<TrieNodeProviderResult>)> =
|
||||
Vec::with_capacity(MAX_DEFERRED_BLINDED_NODES);
|
||||
|
||||
while let Ok(job) = work_rx.recv() {
|
||||
// Mark worker as busy.
|
||||
available_workers.fetch_sub(1, Ordering::Relaxed);
|
||||
|
||||
match job {
|
||||
AccountWorkerJob::AccountMultiproof { input } => {
|
||||
// Start batching: accumulate account multiproof jobs. If we encounter an
|
||||
// incompatible job (different caches), process it as a separate batch.
|
||||
let mut next_account_job: Option<Box<AccountMultiproofInput>> = Some(input);
|
||||
|
||||
while let Some(account_job) = next_account_job.take() {
|
||||
let mut batch = BatchedAccountProof::new(*account_job);
|
||||
let mut pending_incompatible: Option<Box<AccountMultiproofInput>> = None;
|
||||
|
||||
// Drain additional jobs from the queue.
|
||||
while batch.senders.len() < ACCOUNT_PROOF_BATCH_LIMIT {
|
||||
match work_rx.try_recv() {
|
||||
Ok(AccountWorkerJob::AccountMultiproof { input: next_input }) => {
|
||||
match batch.try_merge(*next_input) {
|
||||
Ok(()) => {}
|
||||
Err(incompatible) => {
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
"Account multiproof batch split due to incompatible caches"
|
||||
);
|
||||
pending_incompatible = Some(Box::new(incompatible));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(AccountWorkerJob::BlindedAccountNode {
|
||||
path,
|
||||
result_sender,
|
||||
}) => {
|
||||
// Defer blinded node jobs to process after batched proofs.
|
||||
deferred_blinded_nodes.push((path, result_sender));
|
||||
// Stop batching if too many blinded nodes are deferred to
|
||||
// prevent starvation.
|
||||
if deferred_blinded_nodes.len() >= MAX_DEFERRED_BLINDED_NODES {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
|
||||
let batch_size = batch.senders.len();
|
||||
batch_metrics.record_batch_size(batch_size);
|
||||
|
||||
let (merged_input, senders) = batch.into_input();
|
||||
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
batch_size,
|
||||
targets_len = merged_input.targets.len(),
|
||||
"Processing batched account multiproof"
|
||||
);
|
||||
|
||||
Self::process_batched_account_multiproof(
|
||||
worker_id,
|
||||
&proof_tx,
|
||||
&storage_work_tx,
|
||||
merged_input,
|
||||
senders,
|
||||
&mut account_proofs_processed,
|
||||
&mut cursor_metrics_cache,
|
||||
);
|
||||
|
||||
// If we encountered an incompatible job, process it as its own batch
|
||||
// before handling any deferred blinded node requests.
|
||||
if let Some(incompatible_job) = pending_incompatible {
|
||||
next_account_job = Some(incompatible_job);
|
||||
}
|
||||
}
|
||||
|
||||
// Process any deferred blinded node jobs.
|
||||
for (path, result_sender) in std::mem::take(&mut deferred_blinded_nodes) {
|
||||
Self::process_blinded_node(
|
||||
worker_id,
|
||||
&proof_tx,
|
||||
path,
|
||||
result_sender,
|
||||
&mut account_nodes_processed,
|
||||
);
|
||||
}
|
||||
Self::process_account_multiproof(
|
||||
worker_id,
|
||||
&proof_tx,
|
||||
storage_work_tx.clone(),
|
||||
*input,
|
||||
&mut account_proofs_processed,
|
||||
&mut cursor_metrics_cache,
|
||||
);
|
||||
}
|
||||
|
||||
AccountWorkerJob::BlindedAccountNode { path, result_sender } => {
|
||||
@@ -1553,16 +1074,12 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Processes a batched account multiproof request and sends results to all waiting receivers.
|
||||
///
|
||||
/// This computes a single account multiproof with merged targets and sends the same result
|
||||
/// to all original requestors, reducing redundant trie traversals.
|
||||
fn process_batched_account_multiproof<Provider>(
|
||||
/// Processes an account multiproof request.
|
||||
fn process_account_multiproof<Provider>(
|
||||
worker_id: usize,
|
||||
proof_tx: &ProofTaskTx<Provider>,
|
||||
storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
|
||||
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
|
||||
input: AccountMultiproofInput,
|
||||
senders: Vec<ProofResultContext>,
|
||||
account_proofs_processed: &mut u64,
|
||||
cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
|
||||
) where
|
||||
@@ -1574,21 +1091,21 @@ where
|
||||
collect_branch_node_masks,
|
||||
multi_added_removed_keys,
|
||||
missed_leaves_storage_roots,
|
||||
proof_result_sender: _, // We use the senders vec instead
|
||||
proof_result_sender:
|
||||
ProofResultContext { sender: result_tx, sequence_number: seq, state, start_time: start },
|
||||
} = input;
|
||||
|
||||
let span = debug_span!(
|
||||
target: "trie::proof_task",
|
||||
"Batched account multiproof calculation",
|
||||
"Account multiproof calculation",
|
||||
targets = targets.len(),
|
||||
batch_size = senders.len(),
|
||||
worker_id,
|
||||
);
|
||||
let _span_guard = span.enter();
|
||||
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
"Processing batched account multiproof"
|
||||
"Processing account multiproof"
|
||||
);
|
||||
|
||||
let proof_start = Instant::now();
|
||||
@@ -1603,7 +1120,7 @@ where
|
||||
tracker.set_precomputed_storage_roots(storage_root_targets_len as u64);
|
||||
|
||||
let storage_proof_receivers = match dispatch_storage_proofs(
|
||||
storage_work_tx,
|
||||
&storage_work_tx,
|
||||
&targets,
|
||||
&mut storage_prefix_sets,
|
||||
collect_branch_node_masks,
|
||||
@@ -1611,17 +1128,14 @@ where
|
||||
) {
|
||||
Ok(receivers) => receivers,
|
||||
Err(error) => {
|
||||
// Send error to all receivers
|
||||
let error_msg = error.to_string();
|
||||
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
|
||||
*account_proofs_processed += 1;
|
||||
let _ = sender.send(ProofResultMessage {
|
||||
sequence_number,
|
||||
result: Err(ParallelStateRootError::Other(error_msg.clone())),
|
||||
elapsed: start_time.elapsed(),
|
||||
state,
|
||||
});
|
||||
}
|
||||
// Send error through result channel
|
||||
error!(target: "trie::proof_task", "Failed to dispatch storage proofs: {error}");
|
||||
let _ = result_tx.send(ProofResultMessage {
|
||||
sequence_number: seq,
|
||||
result: Err(error),
|
||||
elapsed: start.elapsed(),
|
||||
state,
|
||||
});
|
||||
return;
|
||||
}
|
||||
};
|
||||
@@ -1642,75 +1156,46 @@ where
|
||||
build_account_multiproof_with_storage_roots(&proof_tx.provider, ctx, &mut tracker);
|
||||
|
||||
let proof_elapsed = proof_start.elapsed();
|
||||
let total_elapsed = start.elapsed();
|
||||
let proof_cursor_metrics = tracker.cursor_metrics;
|
||||
proof_cursor_metrics.record_spans();
|
||||
|
||||
let stats = tracker.finish();
|
||||
let result = result.map(|proof| ProofResult::AccountMultiproof { proof, stats });
|
||||
*account_proofs_processed += 1;
|
||||
|
||||
// Send the result to all waiting receivers.
|
||||
let num_senders = senders.len();
|
||||
match result {
|
||||
Ok(proof) => {
|
||||
// Success case: wrap proof in Arc for efficient sharing across all senders.
|
||||
let proof_result = ProofResult::AccountMultiproof { proof: Arc::new(proof), stats };
|
||||
|
||||
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
|
||||
*account_proofs_processed += 1;
|
||||
|
||||
if sender
|
||||
.send(ProofResultMessage {
|
||||
sequence_number,
|
||||
result: Ok(proof_result.clone()),
|
||||
elapsed: start_time.elapsed(),
|
||||
state,
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
sequence_number,
|
||||
"Account multiproof receiver dropped, discarding result"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
// Error case: convert to string for cloning, then send to all receivers.
|
||||
let error_msg = error.to_string();
|
||||
|
||||
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
|
||||
*account_proofs_processed += 1;
|
||||
|
||||
if sender
|
||||
.send(ProofResultMessage {
|
||||
sequence_number,
|
||||
result: Err(ParallelStateRootError::Other(error_msg.clone())),
|
||||
elapsed: start_time.elapsed(),
|
||||
state,
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
sequence_number,
|
||||
"Account multiproof receiver dropped, discarding result"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Send result to MultiProofTask
|
||||
if result_tx
|
||||
.send(ProofResultMessage {
|
||||
sequence_number: seq,
|
||||
result,
|
||||
elapsed: total_elapsed,
|
||||
state,
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
account_proofs_processed,
|
||||
"Account multiproof receiver dropped, discarding result"
|
||||
);
|
||||
}
|
||||
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
proof_time_us = proof_elapsed.as_micros(),
|
||||
num_senders,
|
||||
total_elapsed_us = total_elapsed.as_micros(),
|
||||
total_processed = account_proofs_processed,
|
||||
account_trie_cursor_duration_us = proof_cursor_metrics.account_trie_cursor.total_duration.as_micros(),
|
||||
account_hashed_cursor_duration_us = proof_cursor_metrics.account_hashed_cursor.total_duration.as_micros(),
|
||||
storage_trie_cursor_duration_us = proof_cursor_metrics.storage_trie_cursor.total_duration.as_micros(),
|
||||
storage_hashed_cursor_duration_us = proof_cursor_metrics.storage_hashed_cursor.total_duration.as_micros(),
|
||||
"Batched account multiproof completed"
|
||||
account_trie_cursor_metrics = ?proof_cursor_metrics.account_trie_cursor,
|
||||
account_hashed_cursor_metrics = ?proof_cursor_metrics.account_hashed_cursor,
|
||||
storage_trie_cursor_metrics = ?proof_cursor_metrics.storage_trie_cursor,
|
||||
storage_hashed_cursor_metrics = ?proof_cursor_metrics.storage_hashed_cursor,
|
||||
"Account multiproof completed"
|
||||
);
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
@@ -1853,9 +1338,7 @@ where
|
||||
|
||||
drop(_guard);
|
||||
|
||||
// Extract storage proof from the result.
|
||||
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it
|
||||
// here.
|
||||
// Extract storage proof from the result
|
||||
let proof = match proof_msg.result? {
|
||||
ProofResult::StorageProof { hashed_address: addr, proof } => {
|
||||
debug_assert_eq!(
|
||||
@@ -1863,9 +1346,7 @@ where
|
||||
hashed_address,
|
||||
"storage worker must return same address: expected {hashed_address}, got {addr}"
|
||||
);
|
||||
// Efficiently unwrap Arc: returns inner value if sole owner, clones
|
||||
// otherwise.
|
||||
Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone())
|
||||
proof
|
||||
}
|
||||
ProofResult::AccountMultiproof { .. } => {
|
||||
unreachable!("storage worker only sends StorageProof variant")
|
||||
@@ -1928,11 +1409,8 @@ where
|
||||
// Consume remaining storage proof receivers for accounts not encountered during trie walk.
|
||||
for (hashed_address, receiver) in storage_proof_receivers {
|
||||
if let Ok(proof_msg) = receiver.recv() {
|
||||
// Extract storage proof from the result.
|
||||
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it here.
|
||||
// Extract storage proof from the result
|
||||
if let Ok(ProofResult::StorageProof { proof, .. }) = proof_msg.result {
|
||||
// Efficiently unwrap Arc: returns inner value if sole owner, clones otherwise.
|
||||
let proof = Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone());
|
||||
collected_decoded_storages.insert(hashed_address, proof);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ Gets the content of a database table for the given key
|
||||
$ op-reth db get mdbx --help
|
||||
```
|
||||
```txt
|
||||
Usage: op-reth db get mdbx [OPTIONS] <TABLE> <KEY> [SUBKEY]
|
||||
Usage: op-reth db get mdbx [OPTIONS] <TABLE> <KEY> [SUBKEY] [END_KEY] [END_SUBKEY]
|
||||
|
||||
Arguments:
|
||||
<TABLE>
|
||||
@@ -18,6 +18,12 @@ Arguments:
|
||||
[SUBKEY]
|
||||
The subkey to get content for
|
||||
|
||||
[END_KEY]
|
||||
Optional end key for range query (exclusive upper bound)
|
||||
|
||||
[END_SUBKEY]
|
||||
Optional end subkey for range query (exclusive upper bound)
|
||||
|
||||
Options:
|
||||
--raw
|
||||
Output bytes instead of human-readable decoded value
|
||||
|
||||
@@ -6,7 +6,7 @@ Gets the content of a database table for the given key
|
||||
$ reth db get mdbx --help
|
||||
```
|
||||
```txt
|
||||
Usage: reth db get mdbx [OPTIONS] <TABLE> <KEY> [SUBKEY]
|
||||
Usage: reth db get mdbx [OPTIONS] <TABLE> <KEY> [SUBKEY] [END_KEY] [END_SUBKEY]
|
||||
|
||||
Arguments:
|
||||
<TABLE>
|
||||
@@ -18,6 +18,12 @@ Arguments:
|
||||
[SUBKEY]
|
||||
The subkey to get content for
|
||||
|
||||
[END_KEY]
|
||||
Optional end key for range query (exclusive upper bound)
|
||||
|
||||
[END_SUBKEY]
|
||||
Optional end subkey for range query (exclusive upper bound)
|
||||
|
||||
Options:
|
||||
--raw
|
||||
Output bytes instead of human-readable decoded value
|
||||
|
||||
@@ -114,6 +114,54 @@ These include general information about the node itself, as well as what protoco
|
||||
}
|
||||
```
|
||||
|
||||
## `admin_peers`
|
||||
|
||||
Returns information about peers currently known to the node.
|
||||
|
||||
| Client | Method invocation |
|
||||
| ------ | ------------------------------ |
|
||||
| RPC | `{"method": "admin_peers", "params": []}` |
|
||||
|
||||
### Example
|
||||
|
||||
```js
|
||||
// > {"jsonrpc":"2.0","id":1,"method":"admin_peers","params":[]}
|
||||
{"jsonrpc":"2.0","id":1,"result":[
|
||||
{
|
||||
"id":"44826a5d6a55f88a18298bca4773fca...",
|
||||
"name":"reth/v0.0.1/x86_64-unknown-linux-gnu",
|
||||
"enode":"enode://44826a5d6a55f88a18298bca4773fca5749cdc3a5c9f308aa7d810e9b31123f3e7c5fba0b1d70aac5308426f47df2a128a6747040a3815cc7dd7167d03be320d@192.168.1.1:30303",
|
||||
"enr":"enr:-IS4QHCYr...",
|
||||
"caps":["eth/67"],
|
||||
"network":{
|
||||
"remoteAddress":"192.168.1.1:30303",
|
||||
"localAddress":"127.0.0.1:30303",
|
||||
"inbound":false,
|
||||
"trusted":false,
|
||||
"staticNode":false
|
||||
},
|
||||
"protocols":{
|
||||
"eth":{"version":67}
|
||||
}
|
||||
}
|
||||
]}
|
||||
```
|
||||
|
||||
## `admin_clearTxpool`
|
||||
|
||||
Clears all transactions from the transaction pool. Returns the number of removed transactions.
|
||||
|
||||
| Client | Method invocation |
|
||||
| ------ | ----------------------------------------- |
|
||||
| RPC | `{"method": "admin_clearTxpool", "params": []}` |
|
||||
|
||||
### Example
|
||||
|
||||
```js
|
||||
// > {"jsonrpc":"2.0","id":1,"method":"admin_clearTxpool","params":[]}
|
||||
{"jsonrpc":"2.0","id":1,"result":42}
|
||||
```
|
||||
|
||||
## `admin_peerEvents`, `admin_peerEvents_unsubscribe`
|
||||
|
||||
Subscribe to events received by peers over the network. This creates a subscription that emits notifications about peer connections and disconnections.
|
||||
|
||||
@@ -176,9 +176,13 @@ The second parameter is an array of one or more trace types (`vmTrace`, `trace`,
|
||||
|
||||
The third and optional parameter is a block number, block hash, or a block tag (`latest`, `finalized`, `safe`, `earliest`, `pending`).
|
||||
|
||||
| Client | Method invocation |
|
||||
| ------ | -------------------------------------------------------------- |
|
||||
| RPC | `{"method": "trace_call", "params": [tx, trace[], block]}` |
|
||||
The fourth and optional parameter is a `stateOverrides` object that temporarily overrides account state used for the trace (balances, nonces, code, storage).
|
||||
|
||||
The fifth and optional parameter is a `blockOverrides` object that temporarily overrides block fields used for the trace (for example `timestamp`, `baseFee`, `number`).
|
||||
|
||||
| Client | Method invocation |
|
||||
| ------ | ------------------------------------------------------------------------------------ |
|
||||
| RPC | `{"method": "trace_call", "params": [tx, trace[], block, stateOverrides, blockOverrides]}` |
|
||||
|
||||
### Example
|
||||
|
||||
|
||||
@@ -406,9 +406,10 @@ No pruning, run as archive node.
|
||||
This configuration will:
|
||||
|
||||
- Run pruning every 5 blocks
|
||||
- Continuously prune all transaction senders, account history and storage history before the block `head-100_000`,
|
||||
- Continuously prune all transaction senders, account history, storage history and bodies history before the block `head-100_000`,
|
||||
i.e. keep the data for the last `100_000` blocks
|
||||
- Prune all receipts before the block 1920000, i.e. keep receipts from the block 1920000
|
||||
- Keep the last 128 blocks of merkle changesets (default behavior)
|
||||
|
||||
```toml
|
||||
[prune]
|
||||
@@ -430,6 +431,14 @@ account_history = { distance = 100_000 } # Prune all historical account states b
|
||||
|
||||
# Storage History pruning configuration
|
||||
storage_history = { distance = 100_000 } # Prune all historical storage states before the block `head-100000`
|
||||
|
||||
# Bodies History pruning configuration
|
||||
bodies_history = { distance = 100_000 } # Prune all historical block bodies before the block `head-100000`
|
||||
|
||||
# Merkle Changesets pruning configuration
|
||||
# Controls pruning of AccountsTrieChangeSets and StoragesTrieChangeSets.
|
||||
# Default: { distance = 128 } - keeps the last 128 blocks of merkle changesets
|
||||
merkle_changesets = { distance = 128 }
|
||||
```
|
||||
|
||||
We can also prune receipts more granular, using the logs filtering:
|
||||
|
||||
14
examples/custom-rpc-middleware/Cargo.toml
Normal file
14
examples/custom-rpc-middleware/Cargo.toml
Normal file
@@ -0,0 +1,14 @@
|
||||
[package]
|
||||
name = "example-custom-rpc-middleware"
|
||||
version = "0.0.0"
|
||||
publish = false
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
reth-ethereum = { workspace = true, features = ["node", "rpc", "cli"] }
|
||||
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
jsonrpsee = { workspace = true, features = ["server", "macros"] }
|
||||
tracing.workspace = true
|
||||
tower.workspace = true
|
||||
117
examples/custom-rpc-middleware/src/main.rs
Normal file
117
examples/custom-rpc-middleware/src/main.rs
Normal file
@@ -0,0 +1,117 @@
|
||||
//! Example of how to create a node with custom middleware that alters a returned error message from
|
||||
//! the RPC
|
||||
//!
|
||||
//! Run with
|
||||
//!
|
||||
//! ```sh
|
||||
//! cargo run -p example-custom-rpc-middleware node --http --dev --dev.block-time 12s --http.api=debug,eth
|
||||
//! ```
|
||||
//!
|
||||
//! Then make an RPC request that will result in an error
|
||||
//!
|
||||
//! ```sh
|
||||
//! curl -s -X POST http://localhost:8545 \
|
||||
//! -H "Content-Type: application/json" \
|
||||
//! -d '{
|
||||
//! "jsonrpc": "2.0",
|
||||
//! "method": "debug_getRawBlock",
|
||||
//! "params": ["2"],
|
||||
//! "id": 1
|
||||
//! }' | jq
|
||||
//! ```
|
||||
|
||||
use clap::Parser;
|
||||
use jsonrpsee::{
|
||||
core::{
|
||||
middleware::{Batch, Notification, RpcServiceT},
|
||||
server::MethodResponse,
|
||||
},
|
||||
types::{ErrorObjectOwned, Id, Request},
|
||||
};
|
||||
use reth_ethereum::{
|
||||
cli::{chainspec::EthereumChainSpecParser, interface::Cli},
|
||||
node::{EthereumAddOns, EthereumNode},
|
||||
};
|
||||
use tower::Layer;
|
||||
|
||||
fn main() {
|
||||
Cli::<EthereumChainSpecParser>::parse()
|
||||
.run(|builder, _| async move {
|
||||
let handle = builder
|
||||
.with_types::<EthereumNode>()
|
||||
.with_components(EthereumNode::components())
|
||||
.with_add_ons(
|
||||
//create ethereum addons with our custom rpc middleware
|
||||
EthereumAddOns::default().with_rpc_middleware(ResponseMutationLayer),
|
||||
)
|
||||
.launch_with_debug_capabilities()
|
||||
.await?;
|
||||
|
||||
handle.wait_for_node_exit().await
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ResponseMutationLayer;
|
||||
|
||||
impl<S> Layer<S> for ResponseMutationLayer {
|
||||
type Service = ResponseMutationService<S>;
|
||||
|
||||
fn layer(&self, inner: S) -> Self::Service {
|
||||
ResponseMutationService { service: inner }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ResponseMutationService<S> {
|
||||
service: S,
|
||||
}
|
||||
|
||||
impl<S> RpcServiceT for ResponseMutationService<S>
|
||||
where
|
||||
S: RpcServiceT<
|
||||
MethodResponse = jsonrpsee::MethodResponse,
|
||||
BatchResponse = jsonrpsee::MethodResponse,
|
||||
NotificationResponse = jsonrpsee::MethodResponse,
|
||||
> + Send
|
||||
+ Sync
|
||||
+ Clone
|
||||
+ 'static,
|
||||
{
|
||||
type MethodResponse = S::MethodResponse;
|
||||
type NotificationResponse = S::NotificationResponse;
|
||||
type BatchResponse = S::BatchResponse;
|
||||
|
||||
fn call<'a>(&self, req: Request<'a>) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
|
||||
tracing::info!("processed call {:?}", req);
|
||||
let service = self.service.clone();
|
||||
Box::pin(async move {
|
||||
let resp = service.call(req).await;
|
||||
|
||||
//we can modify the response with our own custom error
|
||||
if resp.is_error() {
|
||||
let err = ErrorObjectOwned::owned(
|
||||
-31404,
|
||||
"CustomError",
|
||||
Some("Our very own custom error message"),
|
||||
);
|
||||
return MethodResponse::error(Id::Number(1), err);
|
||||
}
|
||||
|
||||
//otherwise just return the original response
|
||||
resp
|
||||
})
|
||||
}
|
||||
|
||||
fn batch<'a>(&self, req: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
|
||||
self.service.batch(req)
|
||||
}
|
||||
|
||||
fn notification<'a>(
|
||||
&self,
|
||||
n: Notification<'a>,
|
||||
) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
|
||||
self.service.notification(n)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user