mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
48 Commits
performanc
...
yk/worker-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1ab3dceb3b | ||
|
|
c9601ff5f5 | ||
|
|
3142c355e9 | ||
|
|
9e101dfff8 | ||
|
|
787532f79e | ||
|
|
55ad9f2c17 | ||
|
|
1a8415fcce | ||
|
|
0493bcaefc | ||
|
|
c70c011867 | ||
|
|
efb27b5c1c | ||
|
|
6729cc6075 | ||
|
|
8528e3a3b4 | ||
|
|
e899f0b3ef | ||
|
|
a2c666cd34 | ||
|
|
b32253ab90 | ||
|
|
66f753109a | ||
|
|
80292fa243 | ||
|
|
5696ceda07 | ||
|
|
e22a1def78 | ||
|
|
a650f8c749 | ||
|
|
094b789aca | ||
|
|
01e1daab53 | ||
|
|
1cedc73483 | ||
|
|
ec7286536e | ||
|
|
64ff5485d6 | ||
|
|
0142b80632 | ||
|
|
59d30c2203 | ||
|
|
29f436d073 | ||
|
|
0d8e70e706 | ||
|
|
0bdb90d704 | ||
|
|
85f660e2ca | ||
|
|
96f9948bab | ||
|
|
d03d9317b6 | ||
|
|
157b38d66f | ||
|
|
5910e6a2ed | ||
|
|
265bde6b3b | ||
|
|
10def2ce7d | ||
|
|
5be4f29c1b | ||
|
|
23b8aa3a49 | ||
|
|
a94fba0a80 | ||
|
|
0c79d4387a | ||
|
|
897dc29a1a | ||
|
|
240bf37df0 | ||
|
|
d39ff64ab3 | ||
|
|
2a7c3351cb | ||
|
|
a7bec82416 | ||
|
|
98ec4ce80c | ||
|
|
3318ed0e5c |
File diff suppressed because it is too large
Load Diff
@@ -126,7 +126,8 @@ impl ParallelProof {
|
||||
)))
|
||||
})?;
|
||||
|
||||
// Extract storage proof directly from the result
|
||||
// Extract storage proof directly from the result.
|
||||
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it here.
|
||||
let storage_proof = match proof_msg.result? {
|
||||
crate::proof_task::ProofResult::StorageProof { hashed_address: addr, proof } => {
|
||||
debug_assert_eq!(
|
||||
@@ -134,7 +135,8 @@ impl ParallelProof {
|
||||
hashed_address,
|
||||
"storage worker must return same address: expected {hashed_address}, got {addr}"
|
||||
);
|
||||
proof
|
||||
// Efficiently unwrap Arc: returns inner value if sole owner, clones otherwise.
|
||||
Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone())
|
||||
}
|
||||
crate::proof_task::ProofResult::AccountMultiproof { .. } => {
|
||||
unreachable!("storage worker only sends StorageProof variant")
|
||||
@@ -223,8 +225,12 @@ impl ParallelProof {
|
||||
)
|
||||
})?;
|
||||
|
||||
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it here.
|
||||
let (multiproof, stats) = match proof_result_msg.result? {
|
||||
crate::proof_task::ProofResult::AccountMultiproof { proof, stats } => (proof, stats),
|
||||
crate::proof_task::ProofResult::AccountMultiproof { proof, stats } => {
|
||||
// Efficiently unwrap Arc: returns inner value if sole owner, clones otherwise.
|
||||
(Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone()), stats)
|
||||
}
|
||||
crate::proof_task::ProofResult::StorageProof { .. } => {
|
||||
unreachable!("account worker only sends AccountMultiproof variant")
|
||||
}
|
||||
|
||||
@@ -41,6 +41,7 @@ use alloy_primitives::{
|
||||
use alloy_rlp::{BufMut, Encodable};
|
||||
use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
|
||||
use dashmap::DashMap;
|
||||
use metrics::Histogram;
|
||||
use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind};
|
||||
use reth_provider::{DatabaseProviderROFactory, ProviderError, ProviderResult};
|
||||
use reth_storage_errors::db::DatabaseError;
|
||||
@@ -79,6 +80,275 @@ use crate::proof_task_metrics::{
|
||||
type StorageProofResult = Result<DecodedStorageMultiProof, ParallelStateRootError>;
|
||||
type TrieNodeProviderResult = Result<Option<RevealedNode>, SparseTrieError>;
|
||||
|
||||
/// Maximum number of storage proof jobs to batch together per account.
|
||||
const STORAGE_PROOF_BATCH_LIMIT: usize = 32;
|
||||
|
||||
/// Maximum number of blinded node requests to defer during storage proof batching.
|
||||
/// When this limit is reached, batching stops early to process deferred nodes,
|
||||
/// preventing starvation of blinded node requests under high proof load.
|
||||
const MAX_DEFERRED_BLINDED_NODES: usize = 16;
|
||||
|
||||
/// Holds batched storage proof jobs for the same account.
|
||||
///
|
||||
/// When multiple storage proof requests arrive for the same account, they can be merged
|
||||
/// into a single proof computation with combined prefix sets and target slots.
|
||||
#[derive(Debug)]
|
||||
struct BatchedStorageProof {
|
||||
/// The merged prefix set from all batched jobs.
|
||||
prefix_set: PrefixSetMut,
|
||||
/// The merged target slots from all batched jobs.
|
||||
target_slots: B256Set,
|
||||
/// Whether any job requested branch node masks.
|
||||
with_branch_node_masks: bool,
|
||||
/// The `multi_added_removed_keys` from the first job (they should all share the same `Arc`).
|
||||
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
|
||||
/// All senders that need to receive the result.
|
||||
senders: Vec<ProofResultContext>,
|
||||
}
|
||||
|
||||
impl BatchedStorageProof {
|
||||
/// Creates a new batch from the first storage proof input.
|
||||
fn new(input: StorageProofInput, sender: ProofResultContext) -> Self {
|
||||
// Convert frozen PrefixSet to mutable PrefixSetMut by collecting its keys.
|
||||
let prefix_set = PrefixSetMut::from(input.prefix_set.iter().copied());
|
||||
Self {
|
||||
prefix_set,
|
||||
target_slots: input.target_slots,
|
||||
with_branch_node_masks: input.with_branch_node_masks,
|
||||
multi_added_removed_keys: input.multi_added_removed_keys,
|
||||
senders: vec![sender],
|
||||
}
|
||||
}
|
||||
|
||||
/// Merges another storage proof job into this batch.
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if `input.multi_added_removed_keys` does not point to the same Arc as the batch's.
|
||||
/// This is a critical invariant for proof correctness.
|
||||
fn merge(&mut self, input: StorageProofInput, sender: ProofResultContext) {
|
||||
// Validate that all batched jobs share the same multi_added_removed_keys Arc.
|
||||
// This is a critical invariant: if jobs have different keys, the merged proof
|
||||
// would be computed with only the first job's keys, producing incorrect results.
|
||||
// Using assert! (not debug_assert!) because incorrect proofs could cause consensus
|
||||
// failures.
|
||||
assert!(
|
||||
match (&self.multi_added_removed_keys, &input.multi_added_removed_keys) {
|
||||
(Some(a), Some(b)) => Arc::ptr_eq(a, b),
|
||||
(None, None) => true,
|
||||
_ => false,
|
||||
},
|
||||
"All batched storage proof jobs must share the same multi_added_removed_keys Arc"
|
||||
);
|
||||
|
||||
self.prefix_set.extend_keys(input.prefix_set.iter().copied());
|
||||
self.target_slots.extend(input.target_slots);
|
||||
self.with_branch_node_masks |= input.with_branch_node_masks;
|
||||
self.senders.push(sender);
|
||||
}
|
||||
|
||||
/// Converts this batch into a single `StorageProofInput` for computation.
|
||||
fn into_input(self, hashed_address: B256) -> (StorageProofInput, Vec<ProofResultContext>) {
|
||||
let input = StorageProofInput {
|
||||
hashed_address,
|
||||
prefix_set: self.prefix_set.freeze(),
|
||||
target_slots: self.target_slots,
|
||||
with_branch_node_masks: self.with_branch_node_masks,
|
||||
multi_added_removed_keys: self.multi_added_removed_keys,
|
||||
};
|
||||
(input, self.senders)
|
||||
}
|
||||
}
|
||||
|
||||
/// Metrics for storage worker batching.
|
||||
#[derive(Clone, Default)]
|
||||
struct StorageWorkerBatchMetrics {
|
||||
/// Histogram of batch sizes (number of jobs merged per computation).
|
||||
#[cfg(feature = "metrics")]
|
||||
batch_size_histogram: Option<Histogram>,
|
||||
}
|
||||
|
||||
impl StorageWorkerBatchMetrics {
|
||||
#[cfg(feature = "metrics")]
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
batch_size_histogram: Some(metrics::histogram!(
|
||||
"trie.proof_task.storage_worker_batch_size"
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "metrics"))]
|
||||
fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
|
||||
fn record_batch_size(&self, _size: usize) {
|
||||
#[cfg(feature = "metrics")]
|
||||
if let Some(h) = &self.batch_size_histogram {
|
||||
h.record(_size as f64);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Maximum number of account multiproof jobs to batch together.
|
||||
const ACCOUNT_PROOF_BATCH_LIMIT: usize = 32;
|
||||
|
||||
/// Holds batched account multiproof jobs.
|
||||
///
|
||||
/// When multiple account multiproof requests arrive, they can be merged
|
||||
/// into a single proof computation with combined targets and prefix sets.
|
||||
#[derive(Debug)]
|
||||
struct BatchedAccountProof {
|
||||
/// The merged targets from all batched jobs.
|
||||
targets: MultiProofTargets,
|
||||
/// The merged account prefix set from all batched jobs.
|
||||
account_prefix_set: PrefixSetMut,
|
||||
/// The merged storage prefix sets from all batched jobs.
|
||||
storage_prefix_sets: B256Map<PrefixSetMut>,
|
||||
/// The merged destroyed accounts from all batched jobs.
|
||||
destroyed_accounts: B256Set,
|
||||
/// Whether any job requested branch node masks.
|
||||
collect_branch_node_masks: bool,
|
||||
/// The `multi_added_removed_keys` from the first job (they should all share the same `Arc`).
|
||||
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
|
||||
/// The shared `missed_leaves_storage_roots` cache from the first job.
|
||||
missed_leaves_storage_roots: Arc<DashMap<B256, B256>>,
|
||||
/// All senders that need to receive the result.
|
||||
senders: Vec<ProofResultContext>,
|
||||
}
|
||||
|
||||
impl BatchedAccountProof {
|
||||
/// Creates a new batch from the first account multiproof input.
|
||||
fn new(input: AccountMultiproofInput) -> Self {
|
||||
// Convert frozen prefix sets to mutable versions.
|
||||
let account_prefix_set =
|
||||
PrefixSetMut::from(input.prefix_sets.account_prefix_set.iter().copied());
|
||||
let storage_prefix_sets = input
|
||||
.prefix_sets
|
||||
.storage_prefix_sets
|
||||
.into_iter()
|
||||
.map(|(addr, ps)| (addr, PrefixSetMut::from(ps.iter().copied())))
|
||||
.collect();
|
||||
let destroyed_accounts = input.prefix_sets.destroyed_accounts;
|
||||
|
||||
Self {
|
||||
targets: input.targets,
|
||||
account_prefix_set,
|
||||
storage_prefix_sets,
|
||||
destroyed_accounts,
|
||||
collect_branch_node_masks: input.collect_branch_node_masks,
|
||||
multi_added_removed_keys: input.multi_added_removed_keys,
|
||||
missed_leaves_storage_roots: input.missed_leaves_storage_roots,
|
||||
senders: vec![input.proof_result_sender],
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to merge another account multiproof job into this batch.
|
||||
///
|
||||
/// Returns the job back if caches are incompatible so the caller can process it separately.
|
||||
fn try_merge(&mut self, input: AccountMultiproofInput) -> Result<(), AccountMultiproofInput> {
|
||||
// Require all jobs to share the same caches; otherwise merging would produce
|
||||
// incorrect proofs by reusing the wrong retained keys or missed-leaf storage roots.
|
||||
let multi_added_removed_keys_mismatch =
|
||||
!match (&self.multi_added_removed_keys, &input.multi_added_removed_keys) {
|
||||
(Some(a), Some(b)) => Arc::ptr_eq(a, b),
|
||||
(None, None) => true,
|
||||
_ => false,
|
||||
};
|
||||
|
||||
if multi_added_removed_keys_mismatch ||
|
||||
!Arc::ptr_eq(&self.missed_leaves_storage_roots, &input.missed_leaves_storage_roots)
|
||||
{
|
||||
return Err(input);
|
||||
}
|
||||
|
||||
// Merge targets.
|
||||
self.targets.extend(input.targets);
|
||||
|
||||
// Merge account prefix set.
|
||||
self.account_prefix_set.extend_keys(input.prefix_sets.account_prefix_set.iter().copied());
|
||||
|
||||
// Merge storage prefix sets.
|
||||
for (addr, ps) in input.prefix_sets.storage_prefix_sets {
|
||||
match self.storage_prefix_sets.entry(addr) {
|
||||
alloy_primitives::map::Entry::Occupied(mut entry) => {
|
||||
entry.get_mut().extend_keys(ps.iter().copied());
|
||||
}
|
||||
alloy_primitives::map::Entry::Vacant(entry) => {
|
||||
entry.insert(PrefixSetMut::from(ps.iter().copied()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Merge destroyed accounts.
|
||||
self.destroyed_accounts.extend(input.prefix_sets.destroyed_accounts);
|
||||
|
||||
// OR the branch node masks flag.
|
||||
self.collect_branch_node_masks |= input.collect_branch_node_masks;
|
||||
|
||||
// Collect the sender.
|
||||
self.senders.push(input.proof_result_sender);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Converts this batch into a single `AccountMultiproofInput` for computation.
|
||||
fn into_input(self) -> (AccountMultiproofInput, Vec<ProofResultContext>) {
|
||||
// Freeze the mutable prefix sets.
|
||||
let storage_prefix_sets: B256Map<PrefixSet> =
|
||||
self.storage_prefix_sets.into_iter().map(|(addr, ps)| (addr, ps.freeze())).collect();
|
||||
|
||||
let prefix_sets = TriePrefixSets {
|
||||
account_prefix_set: self.account_prefix_set.freeze(),
|
||||
storage_prefix_sets,
|
||||
destroyed_accounts: self.destroyed_accounts,
|
||||
};
|
||||
|
||||
// Use a dummy sender for the input since we'll handle all senders separately.
|
||||
let dummy_sender = self.senders.first().expect("batch always has at least one sender");
|
||||
let input = AccountMultiproofInput {
|
||||
targets: self.targets,
|
||||
prefix_sets,
|
||||
collect_branch_node_masks: self.collect_branch_node_masks,
|
||||
multi_added_removed_keys: self.multi_added_removed_keys,
|
||||
missed_leaves_storage_roots: self.missed_leaves_storage_roots,
|
||||
proof_result_sender: dummy_sender.clone(),
|
||||
};
|
||||
(input, self.senders)
|
||||
}
|
||||
}
|
||||
|
||||
/// Metrics for account worker batching.
|
||||
#[derive(Clone, Default)]
|
||||
struct AccountWorkerBatchMetrics {
|
||||
/// Histogram of batch sizes (number of jobs merged per computation).
|
||||
#[cfg(feature = "metrics")]
|
||||
batch_size_histogram: Option<Histogram>,
|
||||
}
|
||||
|
||||
impl AccountWorkerBatchMetrics {
|
||||
#[cfg(feature = "metrics")]
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
batch_size_histogram: Some(metrics::histogram!(
|
||||
"trie.proof_task.account_worker_batch_size"
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "metrics"))]
|
||||
fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
|
||||
fn record_batch_size(&self, _size: usize) {
|
||||
#[cfg(feature = "metrics")]
|
||||
if let Some(h) = &self.batch_size_histogram {
|
||||
h.record(_size as f64);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A handle that provides type-safe access to proof worker pools.
|
||||
///
|
||||
/// The handle stores direct senders to both storage and account worker pools,
|
||||
@@ -552,12 +822,16 @@ impl TrieNodeProvider for ProofTaskTrieNodeProvider {
|
||||
}
|
||||
}
|
||||
/// Result of a proof calculation, which can be either an account multiproof or a storage proof.
|
||||
#[derive(Debug)]
|
||||
///
|
||||
/// The proof data is wrapped in `Arc` to enable efficient sharing when batching multiple
|
||||
/// proof requests. This avoids expensive cloning of the underlying proof structures
|
||||
/// when sending results to multiple receivers.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ProofResult {
|
||||
/// Account multiproof with statistics
|
||||
AccountMultiproof {
|
||||
/// The account multiproof
|
||||
proof: DecodedMultiProof,
|
||||
/// The account multiproof (Arc-wrapped for efficient sharing in batches)
|
||||
proof: Arc<DecodedMultiProof>,
|
||||
/// Statistics collected during proof computation
|
||||
stats: ParallelTrieStats,
|
||||
},
|
||||
@@ -565,8 +839,8 @@ pub enum ProofResult {
|
||||
StorageProof {
|
||||
/// The hashed address this storage proof belongs to
|
||||
hashed_address: B256,
|
||||
/// The storage multiproof
|
||||
proof: DecodedStorageMultiProof,
|
||||
/// The storage multiproof (Arc-wrapped for efficient sharing in batches)
|
||||
proof: Arc<DecodedStorageMultiProof>,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -575,11 +849,17 @@ impl ProofResult {
|
||||
///
|
||||
/// For account multiproofs, returns the multiproof directly (discarding stats).
|
||||
/// For storage proofs, wraps the storage proof into a minimal multiproof.
|
||||
///
|
||||
/// Note: This method clones the inner proof data. If you need to avoid the clone
|
||||
/// when you're the sole owner, consider using `Arc::try_unwrap` first.
|
||||
pub fn into_multiproof(self) -> DecodedMultiProof {
|
||||
match self {
|
||||
Self::AccountMultiproof { proof, stats: _ } => proof,
|
||||
Self::AccountMultiproof { proof, stats: _ } => {
|
||||
Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone())
|
||||
}
|
||||
Self::StorageProof { hashed_address, proof } => {
|
||||
DecodedMultiProof::from_storage_proof(hashed_address, proof)
|
||||
let storage_proof = Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone());
|
||||
DecodedMultiProof::from_storage_proof(hashed_address, storage_proof)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -708,11 +988,18 @@ where
|
||||
/// 2. Advertises availability
|
||||
/// 3. Processes jobs in a loop:
|
||||
/// - Receives job from channel
|
||||
/// - Drains additional same-account storage proof jobs (batching)
|
||||
/// - Marks worker as busy
|
||||
/// - Processes the job
|
||||
/// - Processes the batched jobs as a single proof computation
|
||||
/// - Marks worker as available
|
||||
/// 4. Shuts down when channel closes
|
||||
///
|
||||
/// # Batching Strategy
|
||||
///
|
||||
/// When multiple storage proof requests arrive for the same account, they are merged
|
||||
/// into a single proof computation. This reduces redundant trie traversals when state
|
||||
/// updates arrive faster than proof computation can process them.
|
||||
///
|
||||
/// # Panic Safety
|
||||
///
|
||||
/// If this function panics, the worker thread terminates but other workers
|
||||
@@ -732,6 +1019,7 @@ where
|
||||
// Create provider from factory
|
||||
let provider = task_ctx.factory.database_provider_ro()?;
|
||||
let proof_tx = ProofTaskTx::new(provider, worker_id);
|
||||
let batch_metrics = StorageWorkerBatchMetrics::new();
|
||||
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
@@ -746,20 +1034,104 @@ where
|
||||
// Initially mark this worker as available.
|
||||
available_workers.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// Deferred blinded node jobs to process after batched storage proofs.
|
||||
// Pre-allocate with capacity to avoid reallocations during batching.
|
||||
let mut deferred_blinded_nodes: Vec<(B256, Nibbles, Sender<TrieNodeProviderResult>)> =
|
||||
Vec::with_capacity(MAX_DEFERRED_BLINDED_NODES);
|
||||
|
||||
while let Ok(job) = work_rx.recv() {
|
||||
// Mark worker as busy.
|
||||
available_workers.fetch_sub(1, Ordering::Relaxed);
|
||||
|
||||
match job {
|
||||
StorageWorkerJob::StorageProof { input, proof_result_sender } => {
|
||||
Self::process_storage_proof(
|
||||
worker_id,
|
||||
&proof_tx,
|
||||
input,
|
||||
proof_result_sender,
|
||||
&mut storage_proofs_processed,
|
||||
&mut cursor_metrics_cache,
|
||||
// Start batching: group storage proofs by account.
|
||||
let mut batches: B256Map<BatchedStorageProof> = B256Map::default();
|
||||
batches.insert(
|
||||
input.hashed_address,
|
||||
BatchedStorageProof::new(input, proof_result_sender),
|
||||
);
|
||||
let mut total_jobs = 1usize;
|
||||
|
||||
// Drain additional jobs from the queue.
|
||||
while total_jobs < STORAGE_PROOF_BATCH_LIMIT {
|
||||
match work_rx.try_recv() {
|
||||
Ok(StorageWorkerJob::StorageProof {
|
||||
input: next_input,
|
||||
proof_result_sender: next_sender,
|
||||
}) => {
|
||||
total_jobs += 1;
|
||||
let addr = next_input.hashed_address;
|
||||
match batches.entry(addr) {
|
||||
alloy_primitives::map::Entry::Occupied(mut entry) => {
|
||||
entry.get_mut().merge(next_input, next_sender);
|
||||
}
|
||||
alloy_primitives::map::Entry::Vacant(entry) => {
|
||||
entry.insert(BatchedStorageProof::new(
|
||||
next_input,
|
||||
next_sender,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(StorageWorkerJob::BlindedStorageNode {
|
||||
account,
|
||||
path,
|
||||
result_sender,
|
||||
}) => {
|
||||
// Defer blinded node jobs to process after batched proofs.
|
||||
deferred_blinded_nodes.push((account, path, result_sender));
|
||||
// Stop batching if too many blinded nodes are deferred to prevent
|
||||
// starvation.
|
||||
if deferred_blinded_nodes.len() >= MAX_DEFERRED_BLINDED_NODES {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
|
||||
// Process all batched storage proofs.
|
||||
for (hashed_address, batch) in batches {
|
||||
let batch_size = batch.senders.len();
|
||||
batch_metrics.record_batch_size(batch_size);
|
||||
|
||||
let (merged_input, senders) = batch.into_input(hashed_address);
|
||||
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
?hashed_address,
|
||||
batch_size,
|
||||
prefix_set_len = merged_input.prefix_set.len(),
|
||||
target_slots_len = merged_input.target_slots.len(),
|
||||
"Processing batched storage proof"
|
||||
);
|
||||
|
||||
Self::process_batched_storage_proof(
|
||||
worker_id,
|
||||
&proof_tx,
|
||||
hashed_address,
|
||||
merged_input,
|
||||
senders,
|
||||
&mut storage_proofs_processed,
|
||||
&mut cursor_metrics_cache,
|
||||
);
|
||||
}
|
||||
|
||||
// Process any deferred blinded node jobs.
|
||||
for (account, path, result_sender) in
|
||||
std::mem::take(&mut deferred_blinded_nodes)
|
||||
{
|
||||
Self::process_blinded_node(
|
||||
worker_id,
|
||||
&proof_tx,
|
||||
account,
|
||||
path,
|
||||
result_sender,
|
||||
&mut storage_nodes_processed,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
StorageWorkerJob::BlindedStorageNode { account, path, result_sender } => {
|
||||
@@ -795,82 +1167,103 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Processes a storage proof request.
|
||||
fn process_storage_proof<Provider>(
|
||||
/// Processes a batched storage proof request and sends results to all waiting receivers.
|
||||
///
|
||||
/// This computes a single storage proof with merged targets and sends the same result
|
||||
/// to all original requestors, reducing redundant trie traversals.
|
||||
fn process_batched_storage_proof<Provider>(
|
||||
worker_id: usize,
|
||||
proof_tx: &ProofTaskTx<Provider>,
|
||||
hashed_address: B256,
|
||||
input: StorageProofInput,
|
||||
proof_result_sender: ProofResultContext,
|
||||
senders: Vec<ProofResultContext>,
|
||||
storage_proofs_processed: &mut u64,
|
||||
cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
|
||||
) where
|
||||
Provider: TrieCursorFactory + HashedCursorFactory,
|
||||
{
|
||||
let hashed_address = input.hashed_address;
|
||||
let ProofResultContext { sender, sequence_number: seq, state, start_time } =
|
||||
proof_result_sender;
|
||||
|
||||
let mut trie_cursor_metrics = TrieCursorMetricsCache::default();
|
||||
let mut hashed_cursor_metrics = HashedCursorMetricsCache::default();
|
||||
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
hashed_address = ?hashed_address,
|
||||
prefix_set_len = input.prefix_set.len(),
|
||||
target_slots_len = input.target_slots.len(),
|
||||
"Processing storage proof"
|
||||
);
|
||||
|
||||
let proof_start = Instant::now();
|
||||
let result = proof_tx.compute_storage_proof(
|
||||
input,
|
||||
&mut trie_cursor_metrics,
|
||||
&mut hashed_cursor_metrics,
|
||||
);
|
||||
|
||||
let proof_elapsed = proof_start.elapsed();
|
||||
*storage_proofs_processed += 1;
|
||||
|
||||
let result_msg = result.map(|storage_proof| ProofResult::StorageProof {
|
||||
hashed_address,
|
||||
proof: storage_proof,
|
||||
});
|
||||
// Send the result to all waiting receivers.
|
||||
let num_senders = senders.len();
|
||||
match result {
|
||||
Ok(storage_proof) => {
|
||||
// Success case: wrap proof in Arc for efficient sharing across all senders.
|
||||
let proof_result =
|
||||
ProofResult::StorageProof { hashed_address, proof: Arc::new(storage_proof) };
|
||||
|
||||
if sender
|
||||
.send(ProofResultMessage {
|
||||
sequence_number: seq,
|
||||
result: result_msg,
|
||||
elapsed: start_time.elapsed(),
|
||||
state,
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
hashed_address = ?hashed_address,
|
||||
storage_proofs_processed,
|
||||
"Proof result receiver dropped, discarding result"
|
||||
);
|
||||
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
|
||||
*storage_proofs_processed += 1;
|
||||
|
||||
if sender
|
||||
.send(ProofResultMessage {
|
||||
sequence_number,
|
||||
result: Ok(proof_result.clone()),
|
||||
elapsed: start_time.elapsed(),
|
||||
state,
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
?hashed_address,
|
||||
sequence_number,
|
||||
"Proof result receiver dropped, discarding result"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
// Error case: convert to string for cloning, then send to all receivers.
|
||||
let error_msg = error.to_string();
|
||||
|
||||
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
|
||||
*storage_proofs_processed += 1;
|
||||
|
||||
if sender
|
||||
.send(ProofResultMessage {
|
||||
sequence_number,
|
||||
result: Err(ParallelStateRootError::Other(error_msg.clone())),
|
||||
elapsed: start_time.elapsed(),
|
||||
state,
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
?hashed_address,
|
||||
sequence_number,
|
||||
"Proof result receiver dropped, discarding result"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
hashed_address = ?hashed_address,
|
||||
?hashed_address,
|
||||
proof_time_us = proof_elapsed.as_micros(),
|
||||
total_processed = storage_proofs_processed,
|
||||
num_senders,
|
||||
trie_cursor_duration_us = trie_cursor_metrics.total_duration.as_micros(),
|
||||
hashed_cursor_duration_us = hashed_cursor_metrics.total_duration.as_micros(),
|
||||
?trie_cursor_metrics,
|
||||
?hashed_cursor_metrics,
|
||||
"Storage proof completed"
|
||||
"Batched storage proof completed"
|
||||
);
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
{
|
||||
// Accumulate per-proof metrics into the worker's cache
|
||||
let per_proof_cache = ProofTaskCursorMetricsCache {
|
||||
account_trie_cursor: TrieCursorMetricsCache::default(),
|
||||
account_hashed_cursor: HashedCursorMetricsCache::default(),
|
||||
@@ -987,11 +1380,18 @@ where
|
||||
/// 2. Advertises availability
|
||||
/// 3. Processes jobs in a loop:
|
||||
/// - Receives job from channel
|
||||
/// - Drains additional account multiproof jobs (batching)
|
||||
/// - Marks worker as busy
|
||||
/// - Processes the job
|
||||
/// - Processes the batched jobs as a single proof computation
|
||||
/// - Marks worker as available
|
||||
/// 4. Shuts down when channel closes
|
||||
///
|
||||
/// # Batching Strategy
|
||||
///
|
||||
/// When multiple account multiproof requests arrive, they are merged into
|
||||
/// a single proof computation. This reduces redundant trie traversals when
|
||||
/// state updates arrive faster than proof computation can process them.
|
||||
///
|
||||
/// # Panic Safety
|
||||
///
|
||||
/// If this function panics, the worker thread terminates but other workers
|
||||
@@ -1012,6 +1412,7 @@ where
|
||||
// Create provider from factory
|
||||
let provider = task_ctx.factory.database_provider_ro()?;
|
||||
let proof_tx = ProofTaskTx::new(provider, worker_id);
|
||||
let batch_metrics = AccountWorkerBatchMetrics::new();
|
||||
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
@@ -1026,20 +1427,95 @@ where
|
||||
// Count this worker as available only after successful initialization.
|
||||
available_workers.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// Deferred blinded node jobs to process after batched account proofs.
|
||||
// Pre-allocate with capacity to avoid reallocations during batching.
|
||||
let mut deferred_blinded_nodes: Vec<(Nibbles, Sender<TrieNodeProviderResult>)> =
|
||||
Vec::with_capacity(MAX_DEFERRED_BLINDED_NODES);
|
||||
|
||||
while let Ok(job) = work_rx.recv() {
|
||||
// Mark worker as busy.
|
||||
available_workers.fetch_sub(1, Ordering::Relaxed);
|
||||
|
||||
match job {
|
||||
AccountWorkerJob::AccountMultiproof { input } => {
|
||||
Self::process_account_multiproof(
|
||||
worker_id,
|
||||
&proof_tx,
|
||||
storage_work_tx.clone(),
|
||||
*input,
|
||||
&mut account_proofs_processed,
|
||||
&mut cursor_metrics_cache,
|
||||
);
|
||||
// Start batching: accumulate account multiproof jobs. If we encounter an
|
||||
// incompatible job (different caches), process it as a separate batch.
|
||||
let mut next_account_job: Option<Box<AccountMultiproofInput>> = Some(input);
|
||||
|
||||
while let Some(account_job) = next_account_job.take() {
|
||||
let mut batch = BatchedAccountProof::new(*account_job);
|
||||
let mut pending_incompatible: Option<Box<AccountMultiproofInput>> = None;
|
||||
|
||||
// Drain additional jobs from the queue.
|
||||
while batch.senders.len() < ACCOUNT_PROOF_BATCH_LIMIT {
|
||||
match work_rx.try_recv() {
|
||||
Ok(AccountWorkerJob::AccountMultiproof { input: next_input }) => {
|
||||
match batch.try_merge(*next_input) {
|
||||
Ok(()) => {}
|
||||
Err(incompatible) => {
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
"Account multiproof batch split due to incompatible caches"
|
||||
);
|
||||
pending_incompatible = Some(Box::new(incompatible));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(AccountWorkerJob::BlindedAccountNode {
|
||||
path,
|
||||
result_sender,
|
||||
}) => {
|
||||
// Defer blinded node jobs to process after batched proofs.
|
||||
deferred_blinded_nodes.push((path, result_sender));
|
||||
// Stop batching if too many blinded nodes are deferred to
|
||||
// prevent starvation.
|
||||
if deferred_blinded_nodes.len() >= MAX_DEFERRED_BLINDED_NODES {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
|
||||
let batch_size = batch.senders.len();
|
||||
batch_metrics.record_batch_size(batch_size);
|
||||
|
||||
let (merged_input, senders) = batch.into_input();
|
||||
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
batch_size,
|
||||
targets_len = merged_input.targets.len(),
|
||||
"Processing batched account multiproof"
|
||||
);
|
||||
|
||||
Self::process_batched_account_multiproof(
|
||||
worker_id,
|
||||
&proof_tx,
|
||||
&storage_work_tx,
|
||||
merged_input,
|
||||
senders,
|
||||
&mut account_proofs_processed,
|
||||
&mut cursor_metrics_cache,
|
||||
);
|
||||
|
||||
// Set next batch to process if we encountered incompatible caches.
|
||||
next_account_job = pending_incompatible;
|
||||
}
|
||||
|
||||
// Process any deferred blinded node jobs.
|
||||
for (path, result_sender) in std::mem::take(&mut deferred_blinded_nodes) {
|
||||
Self::process_blinded_node(
|
||||
worker_id,
|
||||
&proof_tx,
|
||||
path,
|
||||
result_sender,
|
||||
&mut account_nodes_processed,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
AccountWorkerJob::BlindedAccountNode { path, result_sender } => {
|
||||
@@ -1074,12 +1550,16 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Processes an account multiproof request.
|
||||
fn process_account_multiproof<Provider>(
|
||||
/// Processes a batched account multiproof request and sends results to all waiting receivers.
|
||||
///
|
||||
/// This computes a single account multiproof with merged targets and sends the same result
|
||||
/// to all original requestors, reducing redundant trie traversals.
|
||||
fn process_batched_account_multiproof<Provider>(
|
||||
worker_id: usize,
|
||||
proof_tx: &ProofTaskTx<Provider>,
|
||||
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
|
||||
storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
|
||||
input: AccountMultiproofInput,
|
||||
senders: Vec<ProofResultContext>,
|
||||
account_proofs_processed: &mut u64,
|
||||
cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
|
||||
) where
|
||||
@@ -1091,21 +1571,21 @@ where
|
||||
collect_branch_node_masks,
|
||||
multi_added_removed_keys,
|
||||
missed_leaves_storage_roots,
|
||||
proof_result_sender:
|
||||
ProofResultContext { sender: result_tx, sequence_number: seq, state, start_time: start },
|
||||
proof_result_sender: _, // We use the senders vec instead
|
||||
} = input;
|
||||
|
||||
let span = debug_span!(
|
||||
target: "trie::proof_task",
|
||||
"Account multiproof calculation",
|
||||
"Batched account multiproof calculation",
|
||||
targets = targets.len(),
|
||||
batch_size = senders.len(),
|
||||
worker_id,
|
||||
);
|
||||
let _span_guard = span.enter();
|
||||
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
"Processing account multiproof"
|
||||
"Processing batched account multiproof"
|
||||
);
|
||||
|
||||
let proof_start = Instant::now();
|
||||
@@ -1120,7 +1600,7 @@ where
|
||||
tracker.set_precomputed_storage_roots(storage_root_targets_len as u64);
|
||||
|
||||
let storage_proof_receivers = match dispatch_storage_proofs(
|
||||
&storage_work_tx,
|
||||
storage_work_tx,
|
||||
&targets,
|
||||
&mut storage_prefix_sets,
|
||||
collect_branch_node_masks,
|
||||
@@ -1128,14 +1608,17 @@ where
|
||||
) {
|
||||
Ok(receivers) => receivers,
|
||||
Err(error) => {
|
||||
// Send error through result channel
|
||||
error!(target: "trie::proof_task", "Failed to dispatch storage proofs: {error}");
|
||||
let _ = result_tx.send(ProofResultMessage {
|
||||
sequence_number: seq,
|
||||
result: Err(error),
|
||||
elapsed: start.elapsed(),
|
||||
state,
|
||||
});
|
||||
// Send error to all receivers
|
||||
let error_msg = error.to_string();
|
||||
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
|
||||
*account_proofs_processed += 1;
|
||||
let _ = sender.send(ProofResultMessage {
|
||||
sequence_number,
|
||||
result: Err(ParallelStateRootError::Other(error_msg.clone())),
|
||||
elapsed: start_time.elapsed(),
|
||||
state,
|
||||
});
|
||||
}
|
||||
return;
|
||||
}
|
||||
};
|
||||
@@ -1156,46 +1639,75 @@ where
|
||||
build_account_multiproof_with_storage_roots(&proof_tx.provider, ctx, &mut tracker);
|
||||
|
||||
let proof_elapsed = proof_start.elapsed();
|
||||
let total_elapsed = start.elapsed();
|
||||
let proof_cursor_metrics = tracker.cursor_metrics;
|
||||
proof_cursor_metrics.record_spans();
|
||||
|
||||
let stats = tracker.finish();
|
||||
let result = result.map(|proof| ProofResult::AccountMultiproof { proof, stats });
|
||||
*account_proofs_processed += 1;
|
||||
|
||||
// Send result to MultiProofTask
|
||||
if result_tx
|
||||
.send(ProofResultMessage {
|
||||
sequence_number: seq,
|
||||
result,
|
||||
elapsed: total_elapsed,
|
||||
state,
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
account_proofs_processed,
|
||||
"Account multiproof receiver dropped, discarding result"
|
||||
);
|
||||
// Send the result to all waiting receivers.
|
||||
let num_senders = senders.len();
|
||||
match result {
|
||||
Ok(proof) => {
|
||||
// Success case: wrap proof in Arc for efficient sharing across all senders.
|
||||
let proof_result = ProofResult::AccountMultiproof { proof: Arc::new(proof), stats };
|
||||
|
||||
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
|
||||
*account_proofs_processed += 1;
|
||||
|
||||
if sender
|
||||
.send(ProofResultMessage {
|
||||
sequence_number,
|
||||
result: Ok(proof_result.clone()),
|
||||
elapsed: start_time.elapsed(),
|
||||
state,
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
sequence_number,
|
||||
"Account multiproof receiver dropped, discarding result"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
// Error case: convert to string for cloning, then send to all receivers.
|
||||
let error_msg = error.to_string();
|
||||
|
||||
for ProofResultContext { sender, sequence_number, state, start_time } in senders {
|
||||
*account_proofs_processed += 1;
|
||||
|
||||
if sender
|
||||
.send(ProofResultMessage {
|
||||
sequence_number,
|
||||
result: Err(ParallelStateRootError::Other(error_msg.clone())),
|
||||
elapsed: start_time.elapsed(),
|
||||
state,
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
worker_id,
|
||||
sequence_number,
|
||||
"Account multiproof receiver dropped, discarding result"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trace!(
|
||||
target: "trie::proof_task",
|
||||
proof_time_us = proof_elapsed.as_micros(),
|
||||
total_elapsed_us = total_elapsed.as_micros(),
|
||||
total_processed = account_proofs_processed,
|
||||
num_senders,
|
||||
account_trie_cursor_duration_us = proof_cursor_metrics.account_trie_cursor.total_duration.as_micros(),
|
||||
account_hashed_cursor_duration_us = proof_cursor_metrics.account_hashed_cursor.total_duration.as_micros(),
|
||||
storage_trie_cursor_duration_us = proof_cursor_metrics.storage_trie_cursor.total_duration.as_micros(),
|
||||
storage_hashed_cursor_duration_us = proof_cursor_metrics.storage_hashed_cursor.total_duration.as_micros(),
|
||||
account_trie_cursor_metrics = ?proof_cursor_metrics.account_trie_cursor,
|
||||
account_hashed_cursor_metrics = ?proof_cursor_metrics.account_hashed_cursor,
|
||||
storage_trie_cursor_metrics = ?proof_cursor_metrics.storage_trie_cursor,
|
||||
storage_hashed_cursor_metrics = ?proof_cursor_metrics.storage_hashed_cursor,
|
||||
"Account multiproof completed"
|
||||
"Batched account multiproof completed"
|
||||
);
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
@@ -1338,7 +1850,9 @@ where
|
||||
|
||||
drop(_guard);
|
||||
|
||||
// Extract storage proof from the result
|
||||
// Extract storage proof from the result.
|
||||
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it
|
||||
// here.
|
||||
let proof = match proof_msg.result? {
|
||||
ProofResult::StorageProof { hashed_address: addr, proof } => {
|
||||
debug_assert_eq!(
|
||||
@@ -1346,7 +1860,9 @@ where
|
||||
hashed_address,
|
||||
"storage worker must return same address: expected {hashed_address}, got {addr}"
|
||||
);
|
||||
proof
|
||||
// Efficiently unwrap Arc: returns inner value if sole owner, clones
|
||||
// otherwise.
|
||||
Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone())
|
||||
}
|
||||
ProofResult::AccountMultiproof { .. } => {
|
||||
unreachable!("storage worker only sends StorageProof variant")
|
||||
@@ -1409,8 +1925,11 @@ where
|
||||
// Consume remaining storage proof receivers for accounts not encountered during trie walk.
|
||||
for (hashed_address, receiver) in storage_proof_receivers {
|
||||
if let Ok(proof_msg) = receiver.recv() {
|
||||
// Extract storage proof from the result
|
||||
// Extract storage proof from the result.
|
||||
// The proof is Arc-wrapped for efficient batch sharing, so we unwrap it here.
|
||||
if let Ok(ProofResult::StorageProof { proof, .. }) = proof_msg.result {
|
||||
// Efficiently unwrap Arc: returns inner value if sole owner, clones otherwise.
|
||||
let proof = Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone());
|
||||
collected_decoded_storages.insert(hashed_address, proof);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user