Compare commits

...

48 Commits

Author SHA1 Message Date
yongkangc
1ab3dceb3b fix: strengthen batch invariants and prevent blinded node starvation
- Change debug_assert to assert for multi_added_removed_keys Arc equality
  check in BatchedStorageProof::merge, ensuring incorrect proofs are
  caught in release builds, not just debug

- Change BatchedAccountProof::merge to try_merge returning Result, properly
  handling incompatible caches by processing as separate batches instead
  of panicking

- Add MAX_DEFERRED_BLINDED_NODES (16) limit to prevent starvation of
  blinded node requests under high proof load - stops batching early when
  limit reached

- Pre-allocate deferred_blinded_nodes vectors with capacity

- Remove unnecessary clone of storage_work_tx by taking reference
2025-12-09 10:30:34 +00:00
yongkangc
c9601ff5f5 perf(trie): use Arc for proof sharing and add batch validation
- Wrap DecodedMultiProof and DecodedStorageMultiProof in Arc within ProofResult
  for O(1) cloning when sending batched results to multiple receivers
- Add debug assertions in BatchedStorageProof::merge and BatchedAccountProof::merge
  to validate that all batched jobs share the same Arc for multi_added_removed_keys
  and missed_leaves_storage_roots (critical invariants for correctness)
- Unwrap Arc at extraction sites using try_unwrap for zero-cost when sole owner
2025-12-09 07:55:19 +00:00
yongkangc
3142c355e9 perf(trie): batch proof jobs at worker level
Implement worker-level batching for both storage and account proofs to
reduce redundant trie traversals when multiple proof requests queue up.

This complements the funnel-level batching in MultiProofTask by adding
an additional batching layer at the worker level.
2025-12-09 04:28:50 +00:00
yongkangc
9e101dfff8 fmt and clippy 2025-12-08 09:17:58 +00:00
yongkangc
787532f79e reorder 2025-12-08 09:16:33 +00:00
yongkangc
55ad9f2c17 fix nit 2025-12-08 09:15:38 +00:00
yongkangc
1a8415fcce refactor(engine): reorganize and enhance state update batching logic
Reintroduce and refine functions for estimating target counts and checking state change sources. The logic for merging state updates has been clarified and optimized, ensuring better handling of transaction updates. This change improves code clarity and maintains efficient batching for multiproof tasks.
2025-12-08 09:13:38 +00:00
yongkangc
0493bcaefc docs(multiproof): clarify the creation of multiproof tasks with separate channels for control and proof results 2025-12-08 08:53:27 +00:00
yongkangc
c70c011867 doc 2025-12-08 08:43:44 +00:00
yongkangc
efb27b5c1c chore: fix fmt 2025-12-08 08:39:55 +00:00
yongkangc
6729cc6075 refactor(engine): adjust prefetch and state update batch sizes
Increase the maximum number of targets for prefetch batching from 500 to 512 and decrease the maximum for state updates from 128 to 64. These changes aim to optimize the batching logic and improve overall performance in handling proof requests and state updates.
2025-12-08 05:54:52 +00:00
yongkangc
8528e3a3b4 refactor(engine): increase state update batch size and improve batching logic
Adjust the maximum number of targets for state updates from 50 to 128 to accommodate additional processing requirements. Enhance the batching logic by preallocating the updates vector and clarifying the merging conditions for state updates, ensuring efficient handling of transaction updates.
2025-12-08 05:15:48 +00:00
yongkangc
e899f0b3ef fmt 2025-12-08 04:50:36 +00:00
yongkangc
a2c666cd34 refactor(engine): optimize prefetch batching for improved performance
Enhance the batching mechanism in MultiProofTask by preallocating the target vector to avoid repeated reallocations. This change simplifies the accumulation logic and ensures efficient handling of incoming messages, merging both prefetch targets and updates into single dispatch payloads.
2025-12-08 03:18:54 +00:00
yongkangc
b32253ab90 refactor(engine): simplify batching by removing immediate dispatch
Remove the "dispatch first message immediately" optimization per reviewer
feedback. The first message is now included in the batch and dispatched
together with accumulated messages, simplifying the code.
2025-12-07 02:15:10 +00:00
yongkangc
66f753109a refactor(engine): extract multiproof batch context into structs
Extract &mut parameters from process_multiproof_message into:
- MultiproofBatchCtx: core processing state (pending_msg, timing, updates_finished)
- MultiproofBatchMetrics: counters for proofs processed/requested

This improves code organization and reduces function parameter count.
2025-12-07 02:15:10 +00:00
yongkangc
80292fa243 fix(engine): rename outcome to num_chunks for clarity
Addresses reviewer nit: the variable returned from dispatch_with_chunking
represents number of chunks, so the name should reflect that.
2025-12-07 02:15:10 +00:00
yongkangc
5696ceda07 chore: fix fmt 2025-12-07 02:15:10 +00:00
yongkangc
e22a1def78 fmt 2025-12-07 02:15:10 +00:00
yongkangc
a650f8c749 commment 2025-12-07 02:15:10 +00:00
yongkangc
094b789aca remove ; 2025-12-07 02:15:09 +00:00
yongkangc
01e1daab53 comment 2025-12-07 02:15:09 +00:00
yongkangc
1cedc73483 fmt 2025-12-07 02:15:09 +00:00
yongkangc
ec7286536e fmt 2025-12-07 02:15:09 +00:00
yongkangc
64ff5485d6 fmt 2025-12-07 02:15:09 +00:00
yongkangc
0142b80632 enhance(tests): add detailed descriptions for batching tests
- Added descriptive comments to several test cases in `multiproof.rs` to clarify their purpose and expected behavior.
- Improved documentation for tests related to batching of prefetch proofs, state updates, and message ordering, ensuring better understanding and maintainability of the test suite.
2025-12-07 02:15:09 +00:00
yongkangc
59d30c2203 remove trace 2025-12-07 02:15:09 +00:00
yongkangc
29f436d073 refactor(multiproof): remove unused metrics and streamline batching logic
- Eliminated several unused metrics related to worker availability and chunking performance from `MultiProofTaskMetrics`.
- Updated the batching logic to simplify the handling of state updates, ensuring that the first message is dispatched immediately while accumulating additional messages for batch processing.
- Improved code clarity by consolidating conditional checks for batching state updates.
2025-12-07 02:15:09 +00:00
yongkangc
0d8e70e706 clippy, 2025-12-07 02:15:09 +00:00
yongkangc
0bdb90d704 feat(multiproof): enhance batching logic and introduce chunking
- Added `can_batch_state_update` function to determine if two state updates can be merged based on their sources and payloads.
- Implemented `dispatch_with_chunking` function to handle work items in chunks, improving efficiency based on worker availability and target sizes.
- Updated `MultiProofTask` to utilize the new chunking and batching logic, ensuring better handling of proof dispatches and state updates.
- Added tests to verify the correctness of the new batching behavior, particularly for pre-block updates requiring identical payloads for merging.
2025-12-07 02:15:09 +00:00
yongkangc
85f660e2ca revert comparison 2025-12-07 02:15:09 +00:00
yongkangc
96f9948bab restore prev chunking 2025-12-07 02:15:09 +00:00
yongkangc
d03d9317b6 refactor(multiproof): decrease batching size
- Renamed constants for maximum batch targets and messages to better reflect their purpose: `DEFAULT_MAX_BATCH_TARGETS` to `PREFETCH_MAX_BATCH_TARGETS` and `DEFAULT_MAX_BATCH_MESSAGES` to `PREFETCH_MAX_BATCH_MESSAGES`.
- Introduced a new constant `STATE_UPDATE_MAX_BATCH_TARGETS` to manage state update batching limits.
- Updated batching logic in `MultiProofTask` to utilize the new constants, ensuring clearer intent and improved maintainability.
2025-12-07 02:15:09 +00:00
yongkangc
157b38d66f feat(multiproof): add state change source comparison for batching
- Introduced a new function `same_state_change_source` to determine if two state change sources originate from the same source.
- Updated the batching logic in `MultiProofTask` to utilize this function, ensuring that state updates from different sources are correctly separated during processing.
- Added a test case to verify that the batching mechanism correctly handles and separates state updates from different sources, maintaining the integrity of the batching process.
2025-12-07 02:15:09 +00:00
yongkangc
5910e6a2ed fix(multiproof): always chunk large batches and fix priority inversion
Problem:
The batching feature caused an 80% regression in newPayload latency. Root cause
analysis revealed three issues:

1. **Vicious cycle in chunking logic**: The `should_chunk` condition checked if
   workers were available before chunking. When workers were busy (available=0),
   chunking was skipped, creating larger proofs that kept workers busy longer.
   Metrics showed proof chunks = 1 (no chunking) despite large batches.

2. **Priority inversion with pending_msg**: The main loop processed pending
   messages before checking proof results, violating select_biased! priority
   and starving worker completion signals.

3. **estimate_evm_state_targets undercount**: The estimate was 1 less per
   account with storage slots, allowing batches to exceed the 500 target limit.

Solution:
- Remove worker availability check from chunking decision - always chunk if
  targets exceed threshold. Workers will pick up chunks as they become free.
- Drain proof results with try_recv() BEFORE processing pending messages to
  maintain correct priority.
- Fix estimate to match HashedPostState::chunking_length: 1 + changed_slots
  instead of 1 + changed_slots.saturating_sub(1).

Changes:
- Remove unused `max_targets_for_chunking` field and constant
- Remove `chunking_skipped_busy` counter (no longer applicable)
- Add priority drain loop for proof results before pending_msg processing
- Fix estimate_evm_state_targets formula

Expected Impact:
- Chunking will now happen regardless of worker availability
- Proof results will be processed with proper priority
- Batch sizes will stay within limits
2025-12-07 02:15:09 +00:00
yongkangc
265bde6b3b feat(metrics): add P1 metrics for chunking decision analysis
Add metrics to understand why chunking is skipped:
- available_storage_workers_at_dispatch_histogram: workers available when dispatching
- available_account_workers_at_dispatch_histogram: workers available when dispatching
- chunking_skipped_busy: counter for when chunking skipped due to busy workers
- chunking_skipped_below_threshold: counter for when targets below chunking threshold
- chunking_performed: counter for successful chunking

These metrics help diagnose the vicious cycle where chunking is disabled
exactly when it's most needed (when workers are busy with large proofs).
2025-12-07 02:15:09 +00:00
yongkangc
10def2ce7d refactor(multiproof): restore batch limits to enhance processing efficiency
Reverted DEFAULT_MAX_BATCH_TARGETS to 500 and DEFAULT_MAX_BATCH_MESSAGES to 16 to improve processing efficiency. Implemented a dispatch-then-batch strategy for handling PrefetchProofs and StateUpdate messages, allowing immediate dispatch of the first message while accumulating additional messages for batch processing. This change aims to maintain pipeline flow and optimize message handling.
2025-12-07 02:15:09 +00:00
yongkangc
5be4f29c1b refactor(multiproof): reduce batch limits to enhance pipeline parallelism
Decreased DEFAULT_MAX_BATCH_TARGETS from 500 to 50 and DEFAULT_MAX_BATCH_MESSAGES from 16 to 2 to improve processing efficiency and maintain pipeline parallelism during multiproof message handling.
2025-12-07 02:15:09 +00:00
yongkangc
23b8aa3a49 feat: batch by target instead 2025-12-07 02:15:09 +00:00
yongkangc
a94fba0a80 feat(engine): multiproof batching with target limits and metrics
Introduces constants for maximum batch targets and messages to improve multiproof message processing efficiency. Adds an estimation function for target counts during batching, ensuring that the number of processed messages does not exceed defined limits. Additionally, new histograms are implemented to track the sizes of prefetch and state update batches, enhancing observability of the batching process.
2025-12-07 02:15:09 +00:00
yongkangc
0c79d4387a fix: remove unnecessary mut from test variables
Fixes clippy warning about unused-mut in multiproof tests.
2025-12-07 02:15:09 +00:00
yongkangc
897dc29a1a fmt 2025-12-07 02:15:09 +00:00
yongkangc
240bf37df0 feat(engine): enhance multiproof message processing with batching
Introduces a new method to process multiproof messages, allowing for the batching of consecutive same-type messages. This change improves efficiency by reducing redundant trie traversals when messages arrive in quick succession. The implementation ensures that if a different message type is encountered, it is stored for processing in the next iteration, preserving the order of messages. Additionally, tests have been added to verify that message ordering is maintained during batching.
2025-12-07 02:15:09 +00:00
yongkangc
d39ff64ab3 fix(engine): preserve message ordering in multiproof batching
When batching PrefetchProofs or StateUpdate messages, if a different
message type is encountered during try_recv(), it was being sent back
to the channel via self.tx.send(). This puts the message at the END
of the queue instead of preserving its position, breaking the ordering
of StateUpdate messages which is critical for ProofSequencer.

Fix: Store the pending message in a local buffer and handle it
immediately after processing the batch, before the next select_biased
iteration.
2025-12-07 02:15:09 +00:00
0xSooki
2a7c3351cb fix(multiproof): batching logic dropping messages 2025-12-07 02:15:09 +00:00
0xSooki
a7bec82416 test: add batching tests for prefetch proofs and state updates 2025-12-07 02:15:09 +00:00
0xSooki
98ec4ce80c feat(engine): update multiproof task to batch prefetch proofs and state updates 2025-12-07 02:15:09 +00:00
yongkangc
3318ed0e5c Add bench compare latency stats 2025-12-07 02:15:09 +00:00
3 changed files with 1914 additions and 372 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -126,7 +126,8 @@ impl ParallelProof {
)))
})?;
// Extract storage proof directly from the result
// Extract storage proof directly from the result.
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it here.
let storage_proof = match proof_msg.result? {
crate::proof_task::ProofResult::StorageProof { hashed_address: addr, proof } => {
debug_assert_eq!(
@@ -134,7 +135,8 @@ impl ParallelProof {
hashed_address,
"storage worker must return same address: expected {hashed_address}, got {addr}"
);
proof
// Efficiently unwrap Arc: returns inner value if sole owner, clones otherwise.
Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone())
}
crate::proof_task::ProofResult::AccountMultiproof { .. } => {
unreachable!("storage worker only sends StorageProof variant")
@@ -223,8 +225,12 @@ impl ParallelProof {
)
})?;
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it here.
let (multiproof, stats) = match proof_result_msg.result? {
crate::proof_task::ProofResult::AccountMultiproof { proof, stats } => (proof, stats),
crate::proof_task::ProofResult::AccountMultiproof { proof, stats } => {
// Efficiently unwrap Arc: returns inner value if sole owner, clones otherwise.
(Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone()), stats)
}
crate::proof_task::ProofResult::StorageProof { .. } => {
unreachable!("account worker only sends AccountMultiproof variant")
}

View File

@@ -41,6 +41,7 @@ use alloy_primitives::{
use alloy_rlp::{BufMut, Encodable};
use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use dashmap::DashMap;
use metrics::Histogram;
use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind};
use reth_provider::{DatabaseProviderROFactory, ProviderError, ProviderResult};
use reth_storage_errors::db::DatabaseError;
@@ -79,6 +80,275 @@ use crate::proof_task_metrics::{
type StorageProofResult = Result<DecodedStorageMultiProof, ParallelStateRootError>;
type TrieNodeProviderResult = Result<Option<RevealedNode>, SparseTrieError>;
/// Maximum number of storage proof jobs to batch together per account.
const STORAGE_PROOF_BATCH_LIMIT: usize = 32;
/// Maximum number of blinded node requests to defer during storage proof batching.
/// When this limit is reached, batching stops early to process deferred nodes,
/// preventing starvation of blinded node requests under high proof load.
const MAX_DEFERRED_BLINDED_NODES: usize = 16;
/// Holds batched storage proof jobs for the same account.
///
/// When multiple storage proof requests arrive for the same account, they can be merged
/// into a single proof computation with combined prefix sets and target slots.
#[derive(Debug)]
struct BatchedStorageProof {
/// The merged prefix set from all batched jobs.
prefix_set: PrefixSetMut,
/// The merged target slots from all batched jobs.
target_slots: B256Set,
/// Whether any job requested branch node masks.
with_branch_node_masks: bool,
/// The `multi_added_removed_keys` from the first job (they should all share the same `Arc`).
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
/// All senders that need to receive the result.
senders: Vec<ProofResultContext>,
}
impl BatchedStorageProof {
/// Creates a new batch from the first storage proof input.
fn new(input: StorageProofInput, sender: ProofResultContext) -> Self {
// Convert frozen PrefixSet to mutable PrefixSetMut by collecting its keys.
let prefix_set = PrefixSetMut::from(input.prefix_set.iter().copied());
Self {
prefix_set,
target_slots: input.target_slots,
with_branch_node_masks: input.with_branch_node_masks,
multi_added_removed_keys: input.multi_added_removed_keys,
senders: vec![sender],
}
}
/// Merges another storage proof job into this batch.
///
/// # Panics
/// Panics if `input.multi_added_removed_keys` does not point to the same Arc as the batch's.
/// This is a critical invariant for proof correctness.
fn merge(&mut self, input: StorageProofInput, sender: ProofResultContext) {
// Validate that all batched jobs share the same multi_added_removed_keys Arc.
// This is a critical invariant: if jobs have different keys, the merged proof
// would be computed with only the first job's keys, producing incorrect results.
// Using assert! (not debug_assert!) because incorrect proofs could cause consensus
// failures.
assert!(
match (&self.multi_added_removed_keys, &input.multi_added_removed_keys) {
(Some(a), Some(b)) => Arc::ptr_eq(a, b),
(None, None) => true,
_ => false,
},
"All batched storage proof jobs must share the same multi_added_removed_keys Arc"
);
self.prefix_set.extend_keys(input.prefix_set.iter().copied());
self.target_slots.extend(input.target_slots);
self.with_branch_node_masks |= input.with_branch_node_masks;
self.senders.push(sender);
}
/// Converts this batch into a single `StorageProofInput` for computation.
fn into_input(self, hashed_address: B256) -> (StorageProofInput, Vec<ProofResultContext>) {
let input = StorageProofInput {
hashed_address,
prefix_set: self.prefix_set.freeze(),
target_slots: self.target_slots,
with_branch_node_masks: self.with_branch_node_masks,
multi_added_removed_keys: self.multi_added_removed_keys,
};
(input, self.senders)
}
}
/// Metrics for storage worker batching.
#[derive(Clone, Default)]
struct StorageWorkerBatchMetrics {
/// Histogram of batch sizes (number of jobs merged per computation).
#[cfg(feature = "metrics")]
batch_size_histogram: Option<Histogram>,
}
impl StorageWorkerBatchMetrics {
#[cfg(feature = "metrics")]
fn new() -> Self {
Self {
batch_size_histogram: Some(metrics::histogram!(
"trie.proof_task.storage_worker_batch_size"
)),
}
}
#[cfg(not(feature = "metrics"))]
fn new() -> Self {
Self {}
}
fn record_batch_size(&self, _size: usize) {
#[cfg(feature = "metrics")]
if let Some(h) = &self.batch_size_histogram {
h.record(_size as f64);
}
}
}
/// Maximum number of account multiproof jobs to batch together.
const ACCOUNT_PROOF_BATCH_LIMIT: usize = 32;
/// Holds batched account multiproof jobs.
///
/// When multiple account multiproof requests arrive, they can be merged
/// into a single proof computation with combined targets and prefix sets.
#[derive(Debug)]
struct BatchedAccountProof {
/// The merged targets from all batched jobs.
targets: MultiProofTargets,
/// The merged account prefix set from all batched jobs.
account_prefix_set: PrefixSetMut,
/// The merged storage prefix sets from all batched jobs.
storage_prefix_sets: B256Map<PrefixSetMut>,
/// The merged destroyed accounts from all batched jobs.
destroyed_accounts: B256Set,
/// Whether any job requested branch node masks.
collect_branch_node_masks: bool,
/// The `multi_added_removed_keys` from the first job (they should all share the same `Arc`).
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
/// The shared `missed_leaves_storage_roots` cache from the first job.
missed_leaves_storage_roots: Arc<DashMap<B256, B256>>,
/// All senders that need to receive the result.
senders: Vec<ProofResultContext>,
}
impl BatchedAccountProof {
/// Creates a new batch from the first account multiproof input.
fn new(input: AccountMultiproofInput) -> Self {
// Convert frozen prefix sets to mutable versions.
let account_prefix_set =
PrefixSetMut::from(input.prefix_sets.account_prefix_set.iter().copied());
let storage_prefix_sets = input
.prefix_sets
.storage_prefix_sets
.into_iter()
.map(|(addr, ps)| (addr, PrefixSetMut::from(ps.iter().copied())))
.collect();
let destroyed_accounts = input.prefix_sets.destroyed_accounts;
Self {
targets: input.targets,
account_prefix_set,
storage_prefix_sets,
destroyed_accounts,
collect_branch_node_masks: input.collect_branch_node_masks,
multi_added_removed_keys: input.multi_added_removed_keys,
missed_leaves_storage_roots: input.missed_leaves_storage_roots,
senders: vec![input.proof_result_sender],
}
}
/// Attempts to merge another account multiproof job into this batch.
///
/// Returns the job back if caches are incompatible so the caller can process it separately.
fn try_merge(&mut self, input: AccountMultiproofInput) -> Result<(), AccountMultiproofInput> {
// Require all jobs to share the same caches; otherwise merging would produce
// incorrect proofs by reusing the wrong retained keys or missed-leaf storage roots.
let multi_added_removed_keys_mismatch =
!match (&self.multi_added_removed_keys, &input.multi_added_removed_keys) {
(Some(a), Some(b)) => Arc::ptr_eq(a, b),
(None, None) => true,
_ => false,
};
if multi_added_removed_keys_mismatch ||
!Arc::ptr_eq(&self.missed_leaves_storage_roots, &input.missed_leaves_storage_roots)
{
return Err(input);
}
// Merge targets.
self.targets.extend(input.targets);
// Merge account prefix set.
self.account_prefix_set.extend_keys(input.prefix_sets.account_prefix_set.iter().copied());
// Merge storage prefix sets.
for (addr, ps) in input.prefix_sets.storage_prefix_sets {
match self.storage_prefix_sets.entry(addr) {
alloy_primitives::map::Entry::Occupied(mut entry) => {
entry.get_mut().extend_keys(ps.iter().copied());
}
alloy_primitives::map::Entry::Vacant(entry) => {
entry.insert(PrefixSetMut::from(ps.iter().copied()));
}
}
}
// Merge destroyed accounts.
self.destroyed_accounts.extend(input.prefix_sets.destroyed_accounts);
// OR the branch node masks flag.
self.collect_branch_node_masks |= input.collect_branch_node_masks;
// Collect the sender.
self.senders.push(input.proof_result_sender);
Ok(())
}
/// Converts this batch into a single `AccountMultiproofInput` for computation.
fn into_input(self) -> (AccountMultiproofInput, Vec<ProofResultContext>) {
// Freeze the mutable prefix sets.
let storage_prefix_sets: B256Map<PrefixSet> =
self.storage_prefix_sets.into_iter().map(|(addr, ps)| (addr, ps.freeze())).collect();
let prefix_sets = TriePrefixSets {
account_prefix_set: self.account_prefix_set.freeze(),
storage_prefix_sets,
destroyed_accounts: self.destroyed_accounts,
};
// Use a dummy sender for the input since we'll handle all senders separately.
let dummy_sender = self.senders.first().expect("batch always has at least one sender");
let input = AccountMultiproofInput {
targets: self.targets,
prefix_sets,
collect_branch_node_masks: self.collect_branch_node_masks,
multi_added_removed_keys: self.multi_added_removed_keys,
missed_leaves_storage_roots: self.missed_leaves_storage_roots,
proof_result_sender: dummy_sender.clone(),
};
(input, self.senders)
}
}
/// Metrics for account worker batching.
#[derive(Clone, Default)]
struct AccountWorkerBatchMetrics {
/// Histogram of batch sizes (number of jobs merged per computation).
#[cfg(feature = "metrics")]
batch_size_histogram: Option<Histogram>,
}
impl AccountWorkerBatchMetrics {
#[cfg(feature = "metrics")]
fn new() -> Self {
Self {
batch_size_histogram: Some(metrics::histogram!(
"trie.proof_task.account_worker_batch_size"
)),
}
}
#[cfg(not(feature = "metrics"))]
fn new() -> Self {
Self {}
}
fn record_batch_size(&self, _size: usize) {
#[cfg(feature = "metrics")]
if let Some(h) = &self.batch_size_histogram {
h.record(_size as f64);
}
}
}
/// A handle that provides type-safe access to proof worker pools.
///
/// The handle stores direct senders to both storage and account worker pools,
@@ -552,12 +822,16 @@ impl TrieNodeProvider for ProofTaskTrieNodeProvider {
}
}
/// Result of a proof calculation, which can be either an account multiproof or a storage proof.
#[derive(Debug)]
///
/// The proof data is wrapped in `Arc` to enable efficient sharing when batching multiple
/// proof requests. This avoids expensive cloning of the underlying proof structures
/// when sending results to multiple receivers.
#[derive(Debug, Clone)]
pub enum ProofResult {
/// Account multiproof with statistics
AccountMultiproof {
/// The account multiproof
proof: DecodedMultiProof,
/// The account multiproof (Arc-wrapped for efficient sharing in batches)
proof: Arc<DecodedMultiProof>,
/// Statistics collected during proof computation
stats: ParallelTrieStats,
},
@@ -565,8 +839,8 @@ pub enum ProofResult {
StorageProof {
/// The hashed address this storage proof belongs to
hashed_address: B256,
/// The storage multiproof
proof: DecodedStorageMultiProof,
/// The storage multiproof (Arc-wrapped for efficient sharing in batches)
proof: Arc<DecodedStorageMultiProof>,
},
}
@@ -575,11 +849,17 @@ impl ProofResult {
///
/// For account multiproofs, returns the multiproof directly (discarding stats).
/// For storage proofs, wraps the storage proof into a minimal multiproof.
///
/// Note: This method clones the inner proof data. If you need to avoid the clone
/// when you're the sole owner, consider using `Arc::try_unwrap` first.
pub fn into_multiproof(self) -> DecodedMultiProof {
match self {
Self::AccountMultiproof { proof, stats: _ } => proof,
Self::AccountMultiproof { proof, stats: _ } => {
Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone())
}
Self::StorageProof { hashed_address, proof } => {
DecodedMultiProof::from_storage_proof(hashed_address, proof)
let storage_proof = Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone());
DecodedMultiProof::from_storage_proof(hashed_address, storage_proof)
}
}
}
@@ -708,11 +988,18 @@ where
/// 2. Advertises availability
/// 3. Processes jobs in a loop:
/// - Receives job from channel
/// - Drains additional same-account storage proof jobs (batching)
/// - Marks worker as busy
/// - Processes the job
/// - Processes the batched jobs as a single proof computation
/// - Marks worker as available
/// 4. Shuts down when channel closes
///
/// # Batching Strategy
///
/// When multiple storage proof requests arrive for the same account, they are merged
/// into a single proof computation. This reduces redundant trie traversals when state
/// updates arrive faster than proof computation can process them.
///
/// # Panic Safety
///
/// If this function panics, the worker thread terminates but other workers
@@ -732,6 +1019,7 @@ where
// Create provider from factory
let provider = task_ctx.factory.database_provider_ro()?;
let proof_tx = ProofTaskTx::new(provider, worker_id);
let batch_metrics = StorageWorkerBatchMetrics::new();
trace!(
target: "trie::proof_task",
@@ -746,20 +1034,104 @@ where
// Initially mark this worker as available.
available_workers.fetch_add(1, Ordering::Relaxed);
// Deferred blinded node jobs to process after batched storage proofs.
// Pre-allocate with capacity to avoid reallocations during batching.
let mut deferred_blinded_nodes: Vec<(B256, Nibbles, Sender<TrieNodeProviderResult>)> =
Vec::with_capacity(MAX_DEFERRED_BLINDED_NODES);
while let Ok(job) = work_rx.recv() {
// Mark worker as busy.
available_workers.fetch_sub(1, Ordering::Relaxed);
match job {
StorageWorkerJob::StorageProof { input, proof_result_sender } => {
Self::process_storage_proof(
worker_id,
&proof_tx,
input,
proof_result_sender,
&mut storage_proofs_processed,
&mut cursor_metrics_cache,
// Start batching: group storage proofs by account.
let mut batches: B256Map<BatchedStorageProof> = B256Map::default();
batches.insert(
input.hashed_address,
BatchedStorageProof::new(input, proof_result_sender),
);
let mut total_jobs = 1usize;
// Drain additional jobs from the queue.
while total_jobs < STORAGE_PROOF_BATCH_LIMIT {
match work_rx.try_recv() {
Ok(StorageWorkerJob::StorageProof {
input: next_input,
proof_result_sender: next_sender,
}) => {
total_jobs += 1;
let addr = next_input.hashed_address;
match batches.entry(addr) {
alloy_primitives::map::Entry::Occupied(mut entry) => {
entry.get_mut().merge(next_input, next_sender);
}
alloy_primitives::map::Entry::Vacant(entry) => {
entry.insert(BatchedStorageProof::new(
next_input,
next_sender,
));
}
}
}
Ok(StorageWorkerJob::BlindedStorageNode {
account,
path,
result_sender,
}) => {
// Defer blinded node jobs to process after batched proofs.
deferred_blinded_nodes.push((account, path, result_sender));
// Stop batching if too many blinded nodes are deferred to prevent
// starvation.
if deferred_blinded_nodes.len() >= MAX_DEFERRED_BLINDED_NODES {
break;
}
}
Err(_) => break,
}
}
// Process all batched storage proofs.
for (hashed_address, batch) in batches {
let batch_size = batch.senders.len();
batch_metrics.record_batch_size(batch_size);
let (merged_input, senders) = batch.into_input(hashed_address);
trace!(
target: "trie::proof_task",
worker_id,
?hashed_address,
batch_size,
prefix_set_len = merged_input.prefix_set.len(),
target_slots_len = merged_input.target_slots.len(),
"Processing batched storage proof"
);
Self::process_batched_storage_proof(
worker_id,
&proof_tx,
hashed_address,
merged_input,
senders,
&mut storage_proofs_processed,
&mut cursor_metrics_cache,
);
}
// Process any deferred blinded node jobs.
for (account, path, result_sender) in
std::mem::take(&mut deferred_blinded_nodes)
{
Self::process_blinded_node(
worker_id,
&proof_tx,
account,
path,
result_sender,
&mut storage_nodes_processed,
);
}
}
StorageWorkerJob::BlindedStorageNode { account, path, result_sender } => {
@@ -795,82 +1167,103 @@ where
Ok(())
}
/// Processes a storage proof request.
fn process_storage_proof<Provider>(
/// Processes a batched storage proof request and sends results to all waiting receivers.
///
/// This computes a single storage proof with merged targets and sends the same result
/// to all original requestors, reducing redundant trie traversals.
fn process_batched_storage_proof<Provider>(
worker_id: usize,
proof_tx: &ProofTaskTx<Provider>,
hashed_address: B256,
input: StorageProofInput,
proof_result_sender: ProofResultContext,
senders: Vec<ProofResultContext>,
storage_proofs_processed: &mut u64,
cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
) where
Provider: TrieCursorFactory + HashedCursorFactory,
{
let hashed_address = input.hashed_address;
let ProofResultContext { sender, sequence_number: seq, state, start_time } =
proof_result_sender;
let mut trie_cursor_metrics = TrieCursorMetricsCache::default();
let mut hashed_cursor_metrics = HashedCursorMetricsCache::default();
trace!(
target: "trie::proof_task",
worker_id,
hashed_address = ?hashed_address,
prefix_set_len = input.prefix_set.len(),
target_slots_len = input.target_slots.len(),
"Processing storage proof"
);
let proof_start = Instant::now();
let result = proof_tx.compute_storage_proof(
input,
&mut trie_cursor_metrics,
&mut hashed_cursor_metrics,
);
let proof_elapsed = proof_start.elapsed();
*storage_proofs_processed += 1;
let result_msg = result.map(|storage_proof| ProofResult::StorageProof {
hashed_address,
proof: storage_proof,
});
// Send the result to all waiting receivers.
let num_senders = senders.len();
match result {
Ok(storage_proof) => {
// Success case: wrap proof in Arc for efficient sharing across all senders.
let proof_result =
ProofResult::StorageProof { hashed_address, proof: Arc::new(storage_proof) };
if sender
.send(ProofResultMessage {
sequence_number: seq,
result: result_msg,
elapsed: start_time.elapsed(),
state,
})
.is_err()
{
trace!(
target: "trie::proof_task",
worker_id,
hashed_address = ?hashed_address,
storage_proofs_processed,
"Proof result receiver dropped, discarding result"
);
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
*storage_proofs_processed += 1;
if sender
.send(ProofResultMessage {
sequence_number,
result: Ok(proof_result.clone()),
elapsed: start_time.elapsed(),
state,
})
.is_err()
{
trace!(
target: "trie::proof_task",
worker_id,
?hashed_address,
sequence_number,
"Proof result receiver dropped, discarding result"
);
}
}
}
Err(error) => {
// Error case: convert to string for cloning, then send to all receivers.
let error_msg = error.to_string();
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
*storage_proofs_processed += 1;
if sender
.send(ProofResultMessage {
sequence_number,
result: Err(ParallelStateRootError::Other(error_msg.clone())),
elapsed: start_time.elapsed(),
state,
})
.is_err()
{
trace!(
target: "trie::proof_task",
worker_id,
?hashed_address,
sequence_number,
"Proof result receiver dropped, discarding result"
);
}
}
}
}
trace!(
target: "trie::proof_task",
worker_id,
hashed_address = ?hashed_address,
?hashed_address,
proof_time_us = proof_elapsed.as_micros(),
total_processed = storage_proofs_processed,
num_senders,
trie_cursor_duration_us = trie_cursor_metrics.total_duration.as_micros(),
hashed_cursor_duration_us = hashed_cursor_metrics.total_duration.as_micros(),
?trie_cursor_metrics,
?hashed_cursor_metrics,
"Storage proof completed"
"Batched storage proof completed"
);
#[cfg(feature = "metrics")]
{
// Accumulate per-proof metrics into the worker's cache
let per_proof_cache = ProofTaskCursorMetricsCache {
account_trie_cursor: TrieCursorMetricsCache::default(),
account_hashed_cursor: HashedCursorMetricsCache::default(),
@@ -987,11 +1380,18 @@ where
/// 2. Advertises availability
/// 3. Processes jobs in a loop:
/// - Receives job from channel
/// - Drains additional account multiproof jobs (batching)
/// - Marks worker as busy
/// - Processes the job
/// - Processes the batched jobs as a single proof computation
/// - Marks worker as available
/// 4. Shuts down when channel closes
///
/// # Batching Strategy
///
/// When multiple account multiproof requests arrive, they are merged into
/// a single proof computation. This reduces redundant trie traversals when
/// state updates arrive faster than proof computation can process them.
///
/// # Panic Safety
///
/// If this function panics, the worker thread terminates but other workers
@@ -1012,6 +1412,7 @@ where
// Create provider from factory
let provider = task_ctx.factory.database_provider_ro()?;
let proof_tx = ProofTaskTx::new(provider, worker_id);
let batch_metrics = AccountWorkerBatchMetrics::new();
trace!(
target: "trie::proof_task",
@@ -1026,20 +1427,95 @@ where
// Count this worker as available only after successful initialization.
available_workers.fetch_add(1, Ordering::Relaxed);
// Deferred blinded node jobs to process after batched account proofs.
// Pre-allocate with capacity to avoid reallocations during batching.
let mut deferred_blinded_nodes: Vec<(Nibbles, Sender<TrieNodeProviderResult>)> =
Vec::with_capacity(MAX_DEFERRED_BLINDED_NODES);
while let Ok(job) = work_rx.recv() {
// Mark worker as busy.
available_workers.fetch_sub(1, Ordering::Relaxed);
match job {
AccountWorkerJob::AccountMultiproof { input } => {
Self::process_account_multiproof(
worker_id,
&proof_tx,
storage_work_tx.clone(),
*input,
&mut account_proofs_processed,
&mut cursor_metrics_cache,
);
// Start batching: accumulate account multiproof jobs. If we encounter an
// incompatible job (different caches), process it as a separate batch.
let mut next_account_job: Option<Box<AccountMultiproofInput>> = Some(input);
while let Some(account_job) = next_account_job.take() {
let mut batch = BatchedAccountProof::new(*account_job);
let mut pending_incompatible: Option<Box<AccountMultiproofInput>> = None;
// Drain additional jobs from the queue.
while batch.senders.len() < ACCOUNT_PROOF_BATCH_LIMIT {
match work_rx.try_recv() {
Ok(AccountWorkerJob::AccountMultiproof { input: next_input }) => {
match batch.try_merge(*next_input) {
Ok(()) => {}
Err(incompatible) => {
trace!(
target: "trie::proof_task",
worker_id,
"Account multiproof batch split due to incompatible caches"
);
pending_incompatible = Some(Box::new(incompatible));
break;
}
}
}
Ok(AccountWorkerJob::BlindedAccountNode {
path,
result_sender,
}) => {
// Defer blinded node jobs to process after batched proofs.
deferred_blinded_nodes.push((path, result_sender));
// Stop batching if too many blinded nodes are deferred to
// prevent starvation.
if deferred_blinded_nodes.len() >= MAX_DEFERRED_BLINDED_NODES {
break;
}
}
Err(_) => break,
}
}
let batch_size = batch.senders.len();
batch_metrics.record_batch_size(batch_size);
let (merged_input, senders) = batch.into_input();
trace!(
target: "trie::proof_task",
worker_id,
batch_size,
targets_len = merged_input.targets.len(),
"Processing batched account multiproof"
);
Self::process_batched_account_multiproof(
worker_id,
&proof_tx,
&storage_work_tx,
merged_input,
senders,
&mut account_proofs_processed,
&mut cursor_metrics_cache,
);
// Set next batch to process if we encountered incompatible caches.
next_account_job = pending_incompatible;
}
// Process any deferred blinded node jobs.
for (path, result_sender) in std::mem::take(&mut deferred_blinded_nodes) {
Self::process_blinded_node(
worker_id,
&proof_tx,
path,
result_sender,
&mut account_nodes_processed,
);
}
}
AccountWorkerJob::BlindedAccountNode { path, result_sender } => {
@@ -1074,12 +1550,16 @@ where
Ok(())
}
/// Processes an account multiproof request.
fn process_account_multiproof<Provider>(
/// Processes a batched account multiproof request and sends results to all waiting receivers.
///
/// This computes a single account multiproof with merged targets and sends the same result
/// to all original requestors, reducing redundant trie traversals.
fn process_batched_account_multiproof<Provider>(
worker_id: usize,
proof_tx: &ProofTaskTx<Provider>,
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
input: AccountMultiproofInput,
senders: Vec<ProofResultContext>,
account_proofs_processed: &mut u64,
cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
) where
@@ -1091,21 +1571,21 @@ where
collect_branch_node_masks,
multi_added_removed_keys,
missed_leaves_storage_roots,
proof_result_sender:
ProofResultContext { sender: result_tx, sequence_number: seq, state, start_time: start },
proof_result_sender: _, // We use the senders vec instead
} = input;
let span = debug_span!(
target: "trie::proof_task",
"Account multiproof calculation",
"Batched account multiproof calculation",
targets = targets.len(),
batch_size = senders.len(),
worker_id,
);
let _span_guard = span.enter();
trace!(
target: "trie::proof_task",
"Processing account multiproof"
"Processing batched account multiproof"
);
let proof_start = Instant::now();
@@ -1120,7 +1600,7 @@ where
tracker.set_precomputed_storage_roots(storage_root_targets_len as u64);
let storage_proof_receivers = match dispatch_storage_proofs(
&storage_work_tx,
storage_work_tx,
&targets,
&mut storage_prefix_sets,
collect_branch_node_masks,
@@ -1128,14 +1608,17 @@ where
) {
Ok(receivers) => receivers,
Err(error) => {
// Send error through result channel
error!(target: "trie::proof_task", "Failed to dispatch storage proofs: {error}");
let _ = result_tx.send(ProofResultMessage {
sequence_number: seq,
result: Err(error),
elapsed: start.elapsed(),
state,
});
// Send error to all receivers
let error_msg = error.to_string();
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
*account_proofs_processed += 1;
let _ = sender.send(ProofResultMessage {
sequence_number,
result: Err(ParallelStateRootError::Other(error_msg.clone())),
elapsed: start_time.elapsed(),
state,
});
}
return;
}
};
@@ -1156,46 +1639,75 @@ where
build_account_multiproof_with_storage_roots(&proof_tx.provider, ctx, &mut tracker);
let proof_elapsed = proof_start.elapsed();
let total_elapsed = start.elapsed();
let proof_cursor_metrics = tracker.cursor_metrics;
proof_cursor_metrics.record_spans();
let stats = tracker.finish();
let result = result.map(|proof| ProofResult::AccountMultiproof { proof, stats });
*account_proofs_processed += 1;
// Send result to MultiProofTask
if result_tx
.send(ProofResultMessage {
sequence_number: seq,
result,
elapsed: total_elapsed,
state,
})
.is_err()
{
trace!(
target: "trie::proof_task",
worker_id,
account_proofs_processed,
"Account multiproof receiver dropped, discarding result"
);
// Send the result to all waiting receivers.
let num_senders = senders.len();
match result {
Ok(proof) => {
// Success case: wrap proof in Arc for efficient sharing across all senders.
let proof_result = ProofResult::AccountMultiproof { proof: Arc::new(proof), stats };
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
*account_proofs_processed += 1;
if sender
.send(ProofResultMessage {
sequence_number,
result: Ok(proof_result.clone()),
elapsed: start_time.elapsed(),
state,
})
.is_err()
{
trace!(
target: "trie::proof_task",
worker_id,
sequence_number,
"Account multiproof receiver dropped, discarding result"
);
}
}
}
Err(error) => {
// Error case: convert to string for cloning, then send to all receivers.
let error_msg = error.to_string();
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
*account_proofs_processed += 1;
if sender
.send(ProofResultMessage {
sequence_number,
result: Err(ParallelStateRootError::Other(error_msg.clone())),
elapsed: start_time.elapsed(),
state,
})
.is_err()
{
trace!(
target: "trie::proof_task",
worker_id,
sequence_number,
"Account multiproof receiver dropped, discarding result"
);
}
}
}
}
trace!(
target: "trie::proof_task",
proof_time_us = proof_elapsed.as_micros(),
total_elapsed_us = total_elapsed.as_micros(),
total_processed = account_proofs_processed,
num_senders,
account_trie_cursor_duration_us = proof_cursor_metrics.account_trie_cursor.total_duration.as_micros(),
account_hashed_cursor_duration_us = proof_cursor_metrics.account_hashed_cursor.total_duration.as_micros(),
storage_trie_cursor_duration_us = proof_cursor_metrics.storage_trie_cursor.total_duration.as_micros(),
storage_hashed_cursor_duration_us = proof_cursor_metrics.storage_hashed_cursor.total_duration.as_micros(),
account_trie_cursor_metrics = ?proof_cursor_metrics.account_trie_cursor,
account_hashed_cursor_metrics = ?proof_cursor_metrics.account_hashed_cursor,
storage_trie_cursor_metrics = ?proof_cursor_metrics.storage_trie_cursor,
storage_hashed_cursor_metrics = ?proof_cursor_metrics.storage_hashed_cursor,
"Account multiproof completed"
"Batched account multiproof completed"
);
#[cfg(feature = "metrics")]
@@ -1338,7 +1850,9 @@ where
drop(_guard);
// Extract storage proof from the result
// Extract storage proof from the result.
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it
// here.
let proof = match proof_msg.result? {
ProofResult::StorageProof { hashed_address: addr, proof } => {
debug_assert_eq!(
@@ -1346,7 +1860,9 @@ where
hashed_address,
"storage worker must return same address: expected {hashed_address}, got {addr}"
);
proof
// Efficiently unwrap Arc: returns inner value if sole owner, clones
// otherwise.
Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone())
}
ProofResult::AccountMultiproof { .. } => {
unreachable!("storage worker only sends StorageProof variant")
@@ -1409,8 +1925,11 @@ where
// Consume remaining storage proof receivers for accounts not encountered during trie walk.
for (hashed_address, receiver) in storage_proof_receivers {
if let Ok(proof_msg) = receiver.recv() {
// Extract storage proof from the result
// Extract storage proof from the result.
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it here.
if let Ok(ProofResult::StorageProof { proof, .. }) = proof_msg.result {
// Efficiently unwrap Arc: returns inner value if sole owner, clones otherwise.
let proof = Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone());
collected_decoded_storages.insert(hashed_address, proof);
}
}