From bff11ab663aa87b475e23cd34da18988471d7b8e Mon Sep 17 00:00:00 2001 From: Brian Picciano Date: Tue, 27 Jan 2026 15:54:56 +0100 Subject: [PATCH] refactor(trie): reuse shared StorageProofCalculator for V2 sync storage roots and add deferred encoder metrics (#21424) Co-authored-by: Amp --- crates/engine/primitives/src/config.rs | 16 -- .../tree/src/tree/payload_processor/mod.rs | 4 +- crates/trie/parallel/src/proof_task.rs | 207 ++++++++++++------ .../trie/parallel/src/proof_task_metrics.rs | 30 +++ crates/trie/parallel/src/value_encoder.rs | 172 +++++++++++---- crates/trie/trie/src/proof_v2/value.rs | 27 +-- 6 files changed, 295 insertions(+), 161 deletions(-) diff --git a/crates/engine/primitives/src/config.rs b/crates/engine/primitives/src/config.rs index 6b17a196fd..aecf92eb48 100644 --- a/crates/engine/primitives/src/config.rs +++ b/crates/engine/primitives/src/config.rs @@ -34,11 +34,6 @@ fn default_account_worker_count() -> usize { /// The size of proof targets chunk to spawn in one multiproof calculation. pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE: usize = 60; -/// The size of proof targets chunk to spawn in one multiproof calculation when V2 proofs are -/// enabled. This is 4x the default chunk size to take advantage of more efficient V2 proof -/// computation. -pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE_V2: usize = DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE * 4; - /// Default number of reserved CPU cores for non-reth processes. /// /// This will be deducted from the thread count of main reth global threadpool. @@ -277,17 +272,6 @@ impl TreeConfig { self.multiproof_chunk_size } - /// Return the multiproof task chunk size, using the V2 default if V2 proofs are enabled - /// and the chunk size is at the default value. - pub const fn effective_multiproof_chunk_size(&self) -> usize { - if self.enable_proof_v2 && self.multiproof_chunk_size == DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE - { - DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE_V2 - } else { - self.multiproof_chunk_size - } - } - /// Return the number of reserved CPU cores for non-reth processes pub const fn reserved_cpu_cores(&self) -> usize { self.reserved_cpu_cores diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index dc2ac40068..5d5fac93ee 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -286,9 +286,7 @@ where let multi_proof_task = MultiProofTask::new( proof_handle.clone(), to_sparse_trie, - config - .multiproof_chunking_enabled() - .then_some(config.effective_multiproof_chunk_size()), + config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()), to_multi_proof.clone(), from_multi_proof, ) diff --git a/crates/trie/parallel/src/proof_task.rs b/crates/trie/parallel/src/proof_task.rs index d8073d1acf..3cbf5293d8 100644 --- a/crates/trie/parallel/src/proof_task.rs +++ b/crates/trie/parallel/src/proof_task.rs @@ -33,7 +33,7 @@ use crate::{ root::ParallelStateRootError, stats::{ParallelTrieStats, ParallelTrieTracker}, targets_v2::MultiProofTargetsV2, - value_encoder::AsyncAccountValueEncoder, + value_encoder::{AsyncAccountValueEncoder, ValueEncoderStats}, StorageRootTargets, }; use alloy_primitives::{ @@ -65,6 +65,8 @@ use reth_trie_common::{ }; use reth_trie_sparse::provider::{RevealedNode, TrieNodeProvider, TrieNodeProviderFactory}; use std::{ + cell::RefCell, + rc::Rc, sync::{ atomic::{AtomicUsize, Ordering}, mpsc::{channel, Receiver, Sender}, @@ -82,6 +84,22 @@ use crate::proof_task_metrics::{ type TrieNodeProviderResult = Result, SparseTrieError>; +/// Type alias for the V2 account proof calculator. +type V2AccountProofCalculator<'a, Provider> = proof_v2::ProofCalculator< + ::AccountTrieCursor<'a>, + ::AccountCursor<'a>, + AsyncAccountValueEncoder< + ::StorageTrieCursor<'a>, + ::StorageCursor<'a>, + >, +>; + +/// Type alias for the V2 storage proof calculator. +type V2StorageProofCalculator<'a, Provider> = proof_v2::StorageProofCalculator< + ::StorageTrieCursor<'a>, + ::StorageCursor<'a>, +>; + /// A handle that provides type-safe access to proof worker pools. /// /// The handle stores direct senders to both storage and account worker pools, @@ -543,15 +561,6 @@ where ProofBlindedStorageProvider::new(&self.provider, &self.provider, account); storage_node_provider.trie_node(path) } - - /// Process a blinded account node request. - /// - /// Used by account workers to retrieve blinded account trie nodes for proof construction. - fn process_blinded_account_node(&self, path: &Nibbles) -> TrieNodeProviderResult { - let account_node_provider = - ProofBlindedAccountProvider::new(&self.provider, &self.provider); - account_node_provider.trie_node(path) - } } impl TrieNodeProviderFactory for ProofWorkerHandle { type AccountNodeProvider = ProofTaskTrieNodeProvider; @@ -888,7 +897,12 @@ where // Initially mark this worker as available. self.available_workers.fetch_add(1, Ordering::Relaxed); + let mut total_idle_time = Duration::ZERO; + let mut idle_start = Instant::now(); + while let Ok(job) = self.work_rx.recv() { + total_idle_time += idle_start.elapsed(); + // Mark worker as busy. self.available_workers.fetch_sub(1, Ordering::Relaxed); @@ -918,6 +932,8 @@ where // Mark worker as available again. self.available_workers.fetch_add(1, Ordering::Relaxed); + + idle_start = Instant::now(); } trace!( @@ -925,12 +941,14 @@ where worker_id = self.worker_id, storage_proofs_processed, storage_nodes_processed, + total_idle_time_us = total_idle_time.as_micros(), "Storage worker shutting down" ); #[cfg(feature = "metrics")] { self.metrics.record_storage_nodes(storage_nodes_processed as usize); + self.metrics.record_storage_worker_idle_time(total_idle_time); self.cursor_metrics.record(&mut cursor_metrics_cache); } @@ -1094,7 +1112,7 @@ struct AccountProofWorker { work_rx: CrossbeamReceiver, /// Unique identifier for this worker (used for tracing) worker_id: usize, - /// Channel for dispatching storage proof work + /// Channel for dispatching storage proof work (for pre-dispatched target proofs) storage_work_tx: CrossbeamSender, /// Counter tracking worker availability available_workers: Arc, @@ -1165,9 +1183,7 @@ where /// If this function panics, the worker thread terminates but other workers /// continue operating and the system degrades gracefully. fn run(mut self) -> ProviderResult<()> { - // Create provider from factory let provider = self.task_ctx.factory.database_provider_ro()?; - let proof_tx = ProofTaskTx::new(provider, self.worker_id); trace!( target: "trie::proof_task", @@ -1179,39 +1195,64 @@ where let mut account_nodes_processed = 0u64; let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default(); - let mut v2_calculator = if self.v2_enabled { - let trie_cursor = proof_tx.provider.account_trie_cursor()?; - let hashed_cursor = proof_tx.provider.hashed_account_cursor()?; - Some(proof_v2::ProofCalculator::<_, _, AsyncAccountValueEncoder>::new( - trie_cursor, - hashed_cursor, - )) + // Create both account and storage calculators for V2 proofs. + // The storage calculator is wrapped in Rc> for sharing with value encoders. + let (mut v2_account_calculator, v2_storage_calculator) = if self.v2_enabled { + let account_trie_cursor = provider.account_trie_cursor()?; + let account_hashed_cursor = provider.hashed_account_cursor()?; + + let storage_trie_cursor = provider.storage_trie_cursor(B256::ZERO)?; + let storage_hashed_cursor = provider.hashed_storage_cursor(B256::ZERO)?; + + ( + Some(proof_v2::ProofCalculator::< + _, + _, + AsyncAccountValueEncoder< + ::StorageTrieCursor<'_>, + ::StorageCursor<'_>, + >, + >::new(account_trie_cursor, account_hashed_cursor)), + Some(Rc::new(RefCell::new(proof_v2::StorageProofCalculator::new_storage( + storage_trie_cursor, + storage_hashed_cursor, + )))), + ) } else { - None + (None, None) }; // Count this worker as available only after successful initialization. self.available_workers.fetch_add(1, Ordering::Relaxed); + let mut total_idle_time = Duration::ZERO; + let mut idle_start = Instant::now(); + let mut value_encoder_stats_cache = ValueEncoderStats::default(); + while let Ok(job) = self.work_rx.recv() { + total_idle_time += idle_start.elapsed(); + // Mark worker as busy. self.available_workers.fetch_sub(1, Ordering::Relaxed); match job { AccountWorkerJob::AccountMultiproof { input } => { - self.process_account_multiproof( - &proof_tx, - v2_calculator.as_mut(), + let value_encoder_stats = self.process_account_multiproof( + &provider, + v2_account_calculator.as_mut(), + v2_storage_calculator.clone(), *input, &mut account_proofs_processed, &mut cursor_metrics_cache, ); + total_idle_time += value_encoder_stats.storage_wait_time; + value_encoder_stats_cache.extend(&value_encoder_stats); } AccountWorkerJob::BlindedAccountNode { path, result_sender } => { Self::process_blinded_node( self.worker_id, - &proof_tx, + &provider, path, result_sender, &mut account_nodes_processed, @@ -1221,6 +1262,8 @@ where // Mark worker as available again. self.available_workers.fetch_add(1, Ordering::Relaxed); + + idle_start = Instant::now(); } trace!( @@ -1228,13 +1271,16 @@ where worker_id=self.worker_id, account_proofs_processed, account_nodes_processed, + total_idle_time_us = total_idle_time.as_micros(), "Account worker shutting down" ); #[cfg(feature = "metrics")] { self.metrics.record_account_nodes(account_nodes_processed as usize); + self.metrics.record_account_worker_idle_time(total_idle_time); self.cursor_metrics.record(&mut cursor_metrics_cache); + self.metrics.record_value_encoder_stats(&value_encoder_stats_cache); } Ok(()) @@ -1242,13 +1288,13 @@ where fn compute_legacy_account_multiproof( &self, - proof_tx: &ProofTaskTx, + provider: &Provider, targets: MultiProofTargets, mut prefix_sets: TriePrefixSets, collect_branch_node_masks: bool, multi_added_removed_keys: Option>, proof_cursor_metrics: &mut ProofTaskCursorMetricsCache, - ) -> Result + ) -> Result<(ProofResult, Duration), ParallelStateRootError> where Provider: TrieCursorFactory + HashedCursorFactory, { @@ -1293,28 +1339,27 @@ where cached_storage_roots: &self.cached_storage_roots, }; + let mut storage_wait_time = Duration::ZERO; let result = build_account_multiproof_with_storage_roots( - &proof_tx.provider, + provider, ctx, &mut tracker, proof_cursor_metrics, - ); + &mut storage_wait_time, + )?; let stats = tracker.finish(); - result.map(|proof| ProofResult::Legacy(proof, stats)) + Ok((ProofResult::Legacy(result, stats), storage_wait_time)) } - fn compute_v2_account_multiproof( + fn compute_v2_account_multiproof<'a, Provider>( &self, - v2_calculator: &mut proof_v2::ProofCalculator< - ::AccountTrieCursor<'_>, - ::AccountCursor<'_>, - AsyncAccountValueEncoder, - >, + v2_account_calculator: &mut V2AccountProofCalculator<'a, Provider>, + v2_storage_calculator: Rc>>, targets: MultiProofTargetsV2, - ) -> Result + ) -> Result<(ProofResult, ValueEncoderStats), ParallelStateRootError> where - Provider: TrieCursorFactory + HashedCursorFactory, + Provider: TrieCursorFactory + HashedCursorFactory + 'a, { let MultiProofTargetsV2 { mut account_targets, storage_targets } = targets; @@ -1333,64 +1378,75 @@ where dispatch_v2_storage_proofs(&self.storage_work_tx, &account_targets, storage_targets)?; let mut value_encoder = AsyncAccountValueEncoder::new( - self.storage_work_tx.clone(), storage_proof_receivers, self.cached_storage_roots.clone(), + v2_storage_calculator, ); - let proof = DecodedMultiProofV2 { - account_proofs: v2_calculator.proof(&mut value_encoder, &mut account_targets)?, - storage_proofs: value_encoder.into_storage_proofs()?, - }; + let account_proofs = + v2_account_calculator.proof(&mut value_encoder, &mut account_targets)?; - Ok(ProofResult::V2(proof)) + let (storage_proofs, value_encoder_stats) = value_encoder.finalize()?; + + let proof = DecodedMultiProofV2 { account_proofs, storage_proofs }; + + Ok((ProofResult::V2(proof), value_encoder_stats)) } /// Processes an account multiproof request. - fn process_account_multiproof( + /// + /// Returns stats from the value encoder used during proof computation. + fn process_account_multiproof<'a, Provider>( &self, - proof_tx: &ProofTaskTx, - v2_calculator: Option< - &mut proof_v2::ProofCalculator< - ::AccountTrieCursor<'_>, - ::AccountCursor<'_>, - AsyncAccountValueEncoder, - >, - >, + provider: &Provider, + v2_account_calculator: Option<&mut V2AccountProofCalculator<'a, Provider>>, + v2_storage_calculator: Option>>>, input: AccountMultiproofInput, account_proofs_processed: &mut u64, cursor_metrics_cache: &mut ProofTaskCursorMetricsCache, - ) where - Provider: TrieCursorFactory + HashedCursorFactory, + ) -> ValueEncoderStats + where + Provider: TrieCursorFactory + HashedCursorFactory + 'a, { let mut proof_cursor_metrics = ProofTaskCursorMetricsCache::default(); let proof_start = Instant::now(); - let (proof_result_sender, result) = match input { + let (proof_result_sender, result, value_encoder_stats) = match input { AccountMultiproofInput::Legacy { targets, prefix_sets, collect_branch_node_masks, multi_added_removed_keys, proof_result_sender, - } => ( - proof_result_sender, - self.compute_legacy_account_multiproof( - proof_tx, + } => { + let (result, value_encoder_stats) = match self.compute_legacy_account_multiproof( + provider, targets, prefix_sets, collect_branch_node_masks, multi_added_removed_keys, &mut proof_cursor_metrics, - ), - ), - AccountMultiproofInput::V2 { targets, proof_result_sender } => ( - proof_result_sender, - self.compute_v2_account_multiproof::( - v2_calculator.expect("v2 calculator provided"), - targets, - ), - ), + ) { + Ok((proof, wait_time)) => ( + Ok(proof), + ValueEncoderStats { storage_wait_time: wait_time, ..Default::default() }, + ), + Err(e) => (Err(e), ValueEncoderStats::default()), + }; + (proof_result_sender, result, value_encoder_stats) + } + AccountMultiproofInput::V2 { targets, proof_result_sender } => { + let (result, value_encoder_stats) = match self + .compute_v2_account_multiproof::( + v2_account_calculator.expect("v2 account calculator provided"), + v2_storage_calculator.expect("v2 storage calculator provided"), + targets, + ) { + Ok((proof, stats)) => (Ok(proof), stats), + Err(e) => (Err(e), ValueEncoderStats::default()), + }; + (proof_result_sender, result, value_encoder_stats) + } }; let ProofResultContext { @@ -1443,12 +1499,14 @@ where #[cfg(feature = "metrics")] // Accumulate per-proof metrics into the worker's cache cursor_metrics_cache.extend(&proof_cursor_metrics); + + value_encoder_stats } /// Processes a blinded account node lookup request. fn process_blinded_node( worker_id: usize, - proof_tx: &ProofTaskTx, + provider: &Provider, path: Nibbles, result_sender: Sender, account_nodes_processed: &mut u64, @@ -1469,7 +1527,8 @@ where ); let start = Instant::now(); - let result = proof_tx.process_blinded_account_node(&path); + let account_node_provider = ProofBlindedAccountProvider::new(provider, provider); + let result = account_node_provider.trie_node(&path); let elapsed = start.elapsed(); *account_nodes_processed += 1; @@ -1500,11 +1559,13 @@ where /// enabling interleaved parallelism between account trie traversal and storage proof computation. /// /// Returns a `DecodedMultiProof` containing the account subtree and storage proofs. +/// Also accumulates the time spent waiting for storage proofs into `storage_wait_time`. fn build_account_multiproof_with_storage_roots

( provider: &P, ctx: AccountMultiproofParams<'_>, tracker: &mut ParallelTrieTracker, proof_cursor_metrics: &mut ProofTaskCursorMetricsCache, + storage_wait_time: &mut Duration, ) -> Result where P: TrieCursorFactory + HashedCursorFactory, @@ -1568,6 +1629,7 @@ where ); // Block on this specific storage proof receiver - enables interleaved // parallelism + let wait_start = Instant::now(); let proof_msg = receiver.recv().map_err(|_| { ParallelStateRootError::StorageRoot( reth_execution_errors::StorageRootError::Database( @@ -1577,6 +1639,7 @@ where ), ) })?; + *storage_wait_time += wait_start.elapsed(); drop(_guard); @@ -1668,7 +1731,9 @@ where // Consume remaining storage proof receivers for accounts not encountered during trie walk. // Done last to allow storage workers more time to complete while we finalized the account trie. for (hashed_address, receiver) in storage_proof_receivers { + let wait_start = Instant::now(); if let Ok(proof_msg) = receiver.recv() { + *storage_wait_time += wait_start.elapsed(); let proof_result = proof_msg.result?; let proof = Into::>::into(proof_result) .expect("Partial proofs are not yet supported"); diff --git a/crates/trie/parallel/src/proof_task_metrics.rs b/crates/trie/parallel/src/proof_task_metrics.rs index f9b8d70c16..e303df287b 100644 --- a/crates/trie/parallel/src/proof_task_metrics.rs +++ b/crates/trie/parallel/src/proof_task_metrics.rs @@ -1,9 +1,11 @@ +use crate::value_encoder::ValueEncoderStats; use reth_metrics::{metrics::Histogram, Metrics}; use reth_trie::{ hashed_cursor::{HashedCursorMetrics, HashedCursorMetricsCache}, trie_cursor::{TrieCursorMetrics, TrieCursorMetricsCache}, TrieType, }; +use std::time::Duration; /// Metrics for the proof task. #[derive(Clone, Metrics)] @@ -13,6 +15,17 @@ pub struct ProofTaskTrieMetrics { blinded_account_nodes: Histogram, /// A histogram for the number of blinded storage nodes fetched. blinded_storage_nodes: Histogram, + /// Histogram for storage worker idle time in seconds (waiting for proof jobs). + storage_worker_idle_time_seconds: Histogram, + /// Histogram for account worker idle time in seconds (waiting for proof jobs + storage + /// results). + account_worker_idle_time_seconds: Histogram, + /// Histogram for `Dispatched` deferred encoder variant count. + deferred_encoder_dispatched: Histogram, + /// Histogram for `FromCache` deferred encoder variant count. + deferred_encoder_from_cache: Histogram, + /// Histogram for `Sync` deferred encoder variant count. + deferred_encoder_sync: Histogram, } impl ProofTaskTrieMetrics { @@ -25,6 +38,23 @@ impl ProofTaskTrieMetrics { pub fn record_storage_nodes(&self, count: usize) { self.blinded_storage_nodes.record(count as f64); } + + /// Record storage worker idle time. + pub fn record_storage_worker_idle_time(&self, duration: Duration) { + self.storage_worker_idle_time_seconds.record(duration.as_secs_f64()); + } + + /// Record account worker idle time. + pub fn record_account_worker_idle_time(&self, duration: Duration) { + self.account_worker_idle_time_seconds.record(duration.as_secs_f64()); + } + + /// Record value encoder stats (deferred encoder variant counts). + pub(crate) fn record_value_encoder_stats(&self, stats: &ValueEncoderStats) { + self.deferred_encoder_dispatched.record(stats.dispatched_count as f64); + self.deferred_encoder_from_cache.record(stats.from_cache_count as f64); + self.deferred_encoder_sync.record(stats.sync_count as f64); + } } /// Cursor metrics for proof task operations. diff --git a/crates/trie/parallel/src/value_encoder.rs b/crates/trie/parallel/src/value_encoder.rs index 7b08d3e1b5..0b082a08d7 100644 --- a/crates/trie/parallel/src/value_encoder.rs +++ b/crates/trie/parallel/src/value_encoder.rs @@ -1,36 +1,79 @@ -use crate::proof_task::{ - StorageProofInput, StorageProofResult, StorageProofResultMessage, StorageWorkerJob, -}; +use crate::proof_task::{StorageProofResult, StorageProofResultMessage}; use alloy_primitives::{map::B256Map, B256}; use alloy_rlp::Encodable; use core::cell::RefCell; -use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; +use crossbeam_channel::Receiver as CrossbeamReceiver; use dashmap::DashMap; use reth_execution_errors::trie::StateProofError; use reth_primitives_traits::Account; use reth_storage_errors::db::DatabaseError; use reth_trie::{ - proof_v2::{DeferredValueEncoder, LeafValueEncoder, Target}, + hashed_cursor::HashedStorageCursor, + proof_v2::{DeferredValueEncoder, LeafValueEncoder, StorageProofCalculator}, + trie_cursor::TrieStorageCursor, ProofTrieNode, }; -use std::{rc::Rc, sync::Arc}; +use std::{ + rc::Rc, + sync::Arc, + time::{Duration, Instant}, +}; + +/// Stats collected by [`AsyncAccountValueEncoder`] during proof computation. +/// +/// Tracks time spent waiting for storage proofs and counts of each deferred encoder variant used. +#[derive(Debug, Default, Clone, Copy)] +pub(crate) struct ValueEncoderStats { + /// Accumulated time spent waiting for storage proof results from dispatched workers. + pub(crate) storage_wait_time: Duration, + /// Number of times the `Dispatched` variant was used (proof pre-dispatched to workers). + pub(crate) dispatched_count: u64, + /// Number of times the `FromCache` variant was used (storage root already cached). + pub(crate) from_cache_count: u64, + /// Number of times the `Sync` variant was used (synchronous computation). + pub(crate) sync_count: u64, +} + +impl ValueEncoderStats { + /// Extends this metrics by adding the values from another. + pub(crate) fn extend(&mut self, other: &Self) { + self.storage_wait_time += other.storage_wait_time; + self.dispatched_count += other.dispatched_count; + self.from_cache_count += other.from_cache_count; + self.sync_count += other.sync_count; + } +} /// Returned from [`AsyncAccountValueEncoder`], used to track an async storage root calculation. -pub(crate) enum AsyncAccountDeferredValueEncoder { +pub(crate) enum AsyncAccountDeferredValueEncoder { + /// A storage proof job was dispatched to the worker pool. Dispatched { hashed_address: B256, account: Account, proof_result_rx: Result, DatabaseError>, - // None if results shouldn't be retained for this dispatched proof. - storage_proof_results: Option>>>>, + /// Shared storage proof results. + storage_proof_results: Rc>>>, + /// Shared stats for tracking wait time and counts. + stats: Rc>, }, - FromCache { + /// The storage root was found in cache. + FromCache { account: Account, root: B256 }, + /// Synchronous storage root computation. + Sync { + /// Shared storage proof calculator for computing storage roots. + storage_calculator: Rc>>, + hashed_address: B256, account: Account, - root: B256, + /// Cache to store computed storage roots for future reuse. + cached_storage_roots: Arc>, }, } -impl DeferredValueEncoder for AsyncAccountDeferredValueEncoder { +impl DeferredValueEncoder for AsyncAccountDeferredValueEncoder +where + TC: TrieStorageCursor, + HC: HashedStorageCursor, +{ fn encode(self, buf: &mut Vec) -> Result<(), StateProofError> { let (account, root) = match self { Self::Dispatched { @@ -38,7 +81,9 @@ impl DeferredValueEncoder for AsyncAccountDeferredValueEncoder { account, proof_result_rx, storage_proof_results, + stats, } => { + let wait_start = Instant::now(); let result = proof_result_rx? .recv() .map_err(|_| { @@ -47,18 +92,27 @@ impl DeferredValueEncoder for AsyncAccountDeferredValueEncoder { ))) })? .result?; + stats.borrow_mut().storage_wait_time += wait_start.elapsed(); let StorageProofResult::V2 { root: Some(root), proof } = result else { panic!("StorageProofResult is not V2 with root: {result:?}") }; - if let Some(storage_proof_results) = storage_proof_results.as_ref() { - storage_proof_results.borrow_mut().insert(hashed_address, proof); - } + storage_proof_results.borrow_mut().insert(hashed_address, proof); (account, root) } Self::FromCache { account, root } => (account, root), + Self::Sync { storage_calculator, hashed_address, account, cached_storage_roots } => { + let mut calculator = storage_calculator.borrow_mut(); + let proof = calculator.storage_proof(hashed_address, &mut [B256::ZERO.into()])?; + let storage_root = calculator + .compute_root_hash(&proof)? + .expect("storage_proof with dummy target always returns root"); + + cached_storage_roots.insert(hashed_address, storage_root); + (account, storage_root) + } }; let account = account.into_trie_account(root); @@ -67,12 +121,15 @@ impl DeferredValueEncoder for AsyncAccountDeferredValueEncoder { } } -/// Implements the [`LeafValueEncoder`] trait for accounts using a [`CrossbeamSender`] to dispatch -/// and compute storage roots asynchronously. Can also accept a set of already dispatched account -/// storage proofs, for cases where it's possible to determine some necessary accounts ahead of -/// time. -pub(crate) struct AsyncAccountValueEncoder { - storage_work_tx: CrossbeamSender, +/// Implements the [`LeafValueEncoder`] trait for accounts. +/// +/// Accepts a set of pre-dispatched storage proof receivers for accounts whose storage roots are +/// being computed asynchronously by worker threads. +/// +/// For accounts without pre-dispatched proofs or cached roots, uses a shared +/// [`StorageProofCalculator`] to compute storage roots synchronously, reusing cursors across +/// multiple accounts. +pub(crate) struct AsyncAccountValueEncoder { /// Storage proof jobs which were dispatched ahead of time. dispatched: B256Map>, /// Storage roots which have already been computed. This can be used only if a storage proof @@ -81,39 +138,59 @@ pub(crate) struct AsyncAccountValueEncoder { /// Tracks storage proof results received from the storage workers. [`Rc`] + [`RefCell`] is /// required because [`DeferredValueEncoder`] cannot have a lifetime. storage_proof_results: Rc>>>, + /// Shared storage proof calculator for synchronous computation. Reuses cursors and internal + /// buffers across multiple storage root calculations. + storage_calculator: Rc>>, + /// Shared stats for tracking wait time and variant counts. + stats: Rc>, } -impl AsyncAccountValueEncoder { - /// Initializes a [`Self`] using a `ProofWorkerHandle` which will be used to calculate storage - /// roots asynchronously. +impl AsyncAccountValueEncoder { + /// Initializes a [`Self`] using a storage proof calculator which will be reused to calculate + /// storage roots synchronously. + /// + /// # Parameters + /// - `dispatched`: Pre-dispatched storage proof receivers for target accounts + /// - `cached_storage_roots`: Shared cache of already-computed storage roots + /// - `storage_calculator`: Shared storage proof calculator for synchronous computation pub(crate) fn new( - storage_work_tx: CrossbeamSender, dispatched: B256Map>, cached_storage_roots: Arc>, + storage_calculator: Rc>>, ) -> Self { Self { - storage_work_tx, dispatched, cached_storage_roots, storage_proof_results: Default::default(), + storage_calculator, + stats: Default::default(), } } - /// Consume [`Self`] and return all collected storage proofs which had been dispatched. + /// Consume [`Self`] and return all collected storage proofs along with accumulated stats. + /// + /// This method collects any remaining dispatched proofs that weren't consumed during proof + /// calculation and includes their wait time in the returned stats. /// /// # Panics /// /// This method panics if any deferred encoders produced by [`Self::deferred_encoder`] have not /// been dropped. - pub(crate) fn into_storage_proofs( + pub(crate) fn finalize( self, - ) -> Result>, StateProofError> { + ) -> Result<(B256Map>, ValueEncoderStats), StateProofError> { let mut storage_proof_results = Rc::into_inner(self.storage_proof_results) .expect("no deferred encoders are still allocated") .into_inner(); - // Any remaining dispatched proofs need to have their results collected + let mut stats = Rc::into_inner(self.stats) + .expect("no deferred encoders are still allocated") + .into_inner(); + + // Any remaining dispatched proofs need to have their results collected. + // These are proofs that were pre-dispatched but not consumed during proof calculation. for (hashed_address, rx) in &self.dispatched { + let wait_start = Instant::now(); let result = rx .recv() .map_err(|_| { @@ -122,6 +199,7 @@ impl AsyncAccountValueEncoder { ))) })? .result?; + stats.storage_wait_time += wait_start.elapsed(); let StorageProofResult::V2 { proof, .. } = result else { panic!("StorageProofResult is not V2: {result:?}") @@ -130,13 +208,17 @@ impl AsyncAccountValueEncoder { storage_proof_results.insert(*hashed_address, proof); } - Ok(storage_proof_results) + Ok((storage_proof_results, stats)) } } -impl LeafValueEncoder for AsyncAccountValueEncoder { +impl LeafValueEncoder for AsyncAccountValueEncoder +where + TC: TrieStorageCursor, + HC: HashedStorageCursor, +{ type Value = Account; - type DeferredEncoder = AsyncAccountDeferredValueEncoder; + type DeferredEncoder = AsyncAccountDeferredValueEncoder; fn deferred_encoder( &mut self, @@ -146,11 +228,13 @@ impl LeafValueEncoder for AsyncAccountValueEncoder { // If the proof job has already been dispatched for this account then it's not necessary to // dispatch another. if let Some(rx) = self.dispatched.remove(&hashed_address) { + self.stats.borrow_mut().dispatched_count += 1; return AsyncAccountDeferredValueEncoder::Dispatched { hashed_address, account, proof_result_rx: Ok(rx), - storage_proof_results: Some(self.storage_proof_results.clone()), + storage_proof_results: self.storage_proof_results.clone(), + stats: self.stats.clone(), } } @@ -159,25 +243,17 @@ impl LeafValueEncoder for AsyncAccountValueEncoder { // If the root is already calculated then just use it directly if let Some(root) = self.cached_storage_roots.get(&hashed_address) { + self.stats.borrow_mut().from_cache_count += 1; return AsyncAccountDeferredValueEncoder::FromCache { account, root: *root } } - // Create a proof input which targets a bogus key, so that we calculate the root as a - // side-effect. - let input = StorageProofInput::new(hashed_address, vec![Target::new(B256::ZERO)]); - let (tx, rx) = crossbeam_channel::bounded(1); - - let proof_result_rx = self - .storage_work_tx - .send(StorageWorkerJob::StorageProof { input, proof_result_sender: tx }) - .map_err(|_| DatabaseError::Other("storage workers unavailable".to_string())) - .map(|_| rx); - - AsyncAccountDeferredValueEncoder::Dispatched { + // Compute storage root synchronously using the shared calculator + self.stats.borrow_mut().sync_count += 1; + AsyncAccountDeferredValueEncoder::Sync { + storage_calculator: self.storage_calculator.clone(), hashed_address, account, - proof_result_rx, - storage_proof_results: None, + cached_storage_roots: self.cached_storage_roots.clone(), } } } diff --git a/crates/trie/trie/src/proof_v2/value.rs b/crates/trie/trie/src/proof_v2/value.rs index 2b7b085119..6c15bbdf08 100644 --- a/crates/trie/trie/src/proof_v2/value.rs +++ b/crates/trie/trie/src/proof_v2/value.rs @@ -109,39 +109,20 @@ where T: TrieCursorFactory, H: HashedCursorFactory, { - // Synchronously computes the storage root for this account and RLP-encodes the resulting - // `TrieAccount` into `buf` fn encode(self, buf: &mut Vec) -> Result<(), StateProofError> { - // Create cursors for storage proof calculation let trie_cursor = self.trie_cursor_factory.storage_trie_cursor(self.hashed_address)?; let hashed_cursor = self.hashed_cursor_factory.hashed_storage_cursor(self.hashed_address)?; - // Create storage proof calculator with StorageValueEncoder let mut storage_proof_calculator = ProofCalculator::new_storage(trie_cursor, hashed_cursor); - // Compute storage root by calling storage_proof with the root path as a target. - // This returns just the root node of the storage trie. + let proof = storage_proof_calculator + .storage_proof(self.hashed_address, &mut [B256::ZERO.into()])?; let storage_root = storage_proof_calculator - .storage_proof(self.hashed_address, &mut [B256::ZERO.into()]) - .map(|nodes| { - // Encode the root node to RLP and hash it - let root_node = - nodes.first().expect("storage_proof always returns at least the root"); - root_node.node.encode(buf); + .compute_root_hash(&proof)? + .expect("storage_proof with dummy target always returns root"); - let storage_root = alloy_primitives::keccak256(buf.as_slice()); - - // Clear the buffer so we can re-use it to encode the TrieAccount - buf.clear(); - - storage_root - })?; - - // Combine account with storage root to create TrieAccount let trie_account = self.account.into_trie_account(storage_root); - - // Encode the trie account trie_account.encode(buf); Ok(())