refactor(trie): reuse shared StorageProofCalculator for V2 sync storage roots and add deferred encoder metrics (#21424)

Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Brian Picciano
2026-01-27 15:54:56 +01:00
committed by GitHub
parent 08cd1cbda6
commit bff11ab663
6 changed files with 295 additions and 161 deletions

View File

@@ -34,11 +34,6 @@ fn default_account_worker_count() -> usize {
/// The size of proof targets chunk to spawn in one multiproof calculation.
pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE: usize = 60;
/// The size of proof targets chunk to spawn in one multiproof calculation when V2 proofs are
/// enabled. This is 4x the default chunk size to take advantage of more efficient V2 proof
/// computation.
pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE_V2: usize = DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE * 4;
/// Default number of reserved CPU cores for non-reth processes.
///
/// This will be deducted from the thread count of main reth global threadpool.
@@ -277,17 +272,6 @@ impl TreeConfig {
self.multiproof_chunk_size
}
/// Return the multiproof task chunk size, using the V2 default if V2 proofs are enabled
/// and the chunk size is at the default value.
pub const fn effective_multiproof_chunk_size(&self) -> usize {
if self.enable_proof_v2 && self.multiproof_chunk_size == DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE
{
DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE_V2
} else {
self.multiproof_chunk_size
}
}
/// Return the number of reserved CPU cores for non-reth processes
pub const fn reserved_cpu_cores(&self) -> usize {
self.reserved_cpu_cores

View File

@@ -286,9 +286,7 @@ where
let multi_proof_task = MultiProofTask::new(
proof_handle.clone(),
to_sparse_trie,
config
.multiproof_chunking_enabled()
.then_some(config.effective_multiproof_chunk_size()),
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
to_multi_proof.clone(),
from_multi_proof,
)

View File

@@ -33,7 +33,7 @@ use crate::{
root::ParallelStateRootError,
stats::{ParallelTrieStats, ParallelTrieTracker},
targets_v2::MultiProofTargetsV2,
value_encoder::AsyncAccountValueEncoder,
value_encoder::{AsyncAccountValueEncoder, ValueEncoderStats},
StorageRootTargets,
};
use alloy_primitives::{
@@ -65,6 +65,8 @@ use reth_trie_common::{
};
use reth_trie_sparse::provider::{RevealedNode, TrieNodeProvider, TrieNodeProviderFactory};
use std::{
cell::RefCell,
rc::Rc,
sync::{
atomic::{AtomicUsize, Ordering},
mpsc::{channel, Receiver, Sender},
@@ -82,6 +84,22 @@ use crate::proof_task_metrics::{
type TrieNodeProviderResult = Result<Option<RevealedNode>, SparseTrieError>;
/// Type alias for the V2 account proof calculator.
type V2AccountProofCalculator<'a, Provider> = proof_v2::ProofCalculator<
<Provider as TrieCursorFactory>::AccountTrieCursor<'a>,
<Provider as HashedCursorFactory>::AccountCursor<'a>,
AsyncAccountValueEncoder<
<Provider as TrieCursorFactory>::StorageTrieCursor<'a>,
<Provider as HashedCursorFactory>::StorageCursor<'a>,
>,
>;
/// Type alias for the V2 storage proof calculator.
type V2StorageProofCalculator<'a, Provider> = proof_v2::StorageProofCalculator<
<Provider as TrieCursorFactory>::StorageTrieCursor<'a>,
<Provider as HashedCursorFactory>::StorageCursor<'a>,
>;
/// A handle that provides type-safe access to proof worker pools.
///
/// The handle stores direct senders to both storage and account worker pools,
@@ -543,15 +561,6 @@ where
ProofBlindedStorageProvider::new(&self.provider, &self.provider, account);
storage_node_provider.trie_node(path)
}
/// Process a blinded account node request.
///
/// Used by account workers to retrieve blinded account trie nodes for proof construction.
fn process_blinded_account_node(&self, path: &Nibbles) -> TrieNodeProviderResult {
let account_node_provider =
ProofBlindedAccountProvider::new(&self.provider, &self.provider);
account_node_provider.trie_node(path)
}
}
impl TrieNodeProviderFactory for ProofWorkerHandle {
type AccountNodeProvider = ProofTaskTrieNodeProvider;
@@ -888,7 +897,12 @@ where
// Initially mark this worker as available.
self.available_workers.fetch_add(1, Ordering::Relaxed);
let mut total_idle_time = Duration::ZERO;
let mut idle_start = Instant::now();
while let Ok(job) = self.work_rx.recv() {
total_idle_time += idle_start.elapsed();
// Mark worker as busy.
self.available_workers.fetch_sub(1, Ordering::Relaxed);
@@ -918,6 +932,8 @@ where
// Mark worker as available again.
self.available_workers.fetch_add(1, Ordering::Relaxed);
idle_start = Instant::now();
}
trace!(
@@ -925,12 +941,14 @@ where
worker_id = self.worker_id,
storage_proofs_processed,
storage_nodes_processed,
total_idle_time_us = total_idle_time.as_micros(),
"Storage worker shutting down"
);
#[cfg(feature = "metrics")]
{
self.metrics.record_storage_nodes(storage_nodes_processed as usize);
self.metrics.record_storage_worker_idle_time(total_idle_time);
self.cursor_metrics.record(&mut cursor_metrics_cache);
}
@@ -1094,7 +1112,7 @@ struct AccountProofWorker<Factory> {
work_rx: CrossbeamReceiver<AccountWorkerJob>,
/// Unique identifier for this worker (used for tracing)
worker_id: usize,
/// Channel for dispatching storage proof work
/// Channel for dispatching storage proof work (for pre-dispatched target proofs)
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
/// Counter tracking worker availability
available_workers: Arc<AtomicUsize>,
@@ -1165,9 +1183,7 @@ where
/// If this function panics, the worker thread terminates but other workers
/// continue operating and the system degrades gracefully.
fn run(mut self) -> ProviderResult<()> {
// Create provider from factory
let provider = self.task_ctx.factory.database_provider_ro()?;
let proof_tx = ProofTaskTx::new(provider, self.worker_id);
trace!(
target: "trie::proof_task",
@@ -1179,39 +1195,64 @@ where
let mut account_nodes_processed = 0u64;
let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
let mut v2_calculator = if self.v2_enabled {
let trie_cursor = proof_tx.provider.account_trie_cursor()?;
let hashed_cursor = proof_tx.provider.hashed_account_cursor()?;
Some(proof_v2::ProofCalculator::<_, _, AsyncAccountValueEncoder>::new(
trie_cursor,
hashed_cursor,
))
// Create both account and storage calculators for V2 proofs.
// The storage calculator is wrapped in Rc<RefCell<...>> for sharing with value encoders.
let (mut v2_account_calculator, v2_storage_calculator) = if self.v2_enabled {
let account_trie_cursor = provider.account_trie_cursor()?;
let account_hashed_cursor = provider.hashed_account_cursor()?;
let storage_trie_cursor = provider.storage_trie_cursor(B256::ZERO)?;
let storage_hashed_cursor = provider.hashed_storage_cursor(B256::ZERO)?;
(
Some(proof_v2::ProofCalculator::<
_,
_,
AsyncAccountValueEncoder<
<Factory::Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
<Factory::Provider as HashedCursorFactory>::StorageCursor<'_>,
>,
>::new(account_trie_cursor, account_hashed_cursor)),
Some(Rc::new(RefCell::new(proof_v2::StorageProofCalculator::new_storage(
storage_trie_cursor,
storage_hashed_cursor,
)))),
)
} else {
None
(None, None)
};
// Count this worker as available only after successful initialization.
self.available_workers.fetch_add(1, Ordering::Relaxed);
let mut total_idle_time = Duration::ZERO;
let mut idle_start = Instant::now();
let mut value_encoder_stats_cache = ValueEncoderStats::default();
while let Ok(job) = self.work_rx.recv() {
total_idle_time += idle_start.elapsed();
// Mark worker as busy.
self.available_workers.fetch_sub(1, Ordering::Relaxed);
match job {
AccountWorkerJob::AccountMultiproof { input } => {
self.process_account_multiproof(
&proof_tx,
v2_calculator.as_mut(),
let value_encoder_stats = self.process_account_multiproof(
&provider,
v2_account_calculator.as_mut(),
v2_storage_calculator.clone(),
*input,
&mut account_proofs_processed,
&mut cursor_metrics_cache,
);
total_idle_time += value_encoder_stats.storage_wait_time;
value_encoder_stats_cache.extend(&value_encoder_stats);
}
AccountWorkerJob::BlindedAccountNode { path, result_sender } => {
Self::process_blinded_node(
self.worker_id,
&proof_tx,
&provider,
path,
result_sender,
&mut account_nodes_processed,
@@ -1221,6 +1262,8 @@ where
// Mark worker as available again.
self.available_workers.fetch_add(1, Ordering::Relaxed);
idle_start = Instant::now();
}
trace!(
@@ -1228,13 +1271,16 @@ where
worker_id=self.worker_id,
account_proofs_processed,
account_nodes_processed,
total_idle_time_us = total_idle_time.as_micros(),
"Account worker shutting down"
);
#[cfg(feature = "metrics")]
{
self.metrics.record_account_nodes(account_nodes_processed as usize);
self.metrics.record_account_worker_idle_time(total_idle_time);
self.cursor_metrics.record(&mut cursor_metrics_cache);
self.metrics.record_value_encoder_stats(&value_encoder_stats_cache);
}
Ok(())
@@ -1242,13 +1288,13 @@ where
fn compute_legacy_account_multiproof<Provider>(
&self,
proof_tx: &ProofTaskTx<Provider>,
provider: &Provider,
targets: MultiProofTargets,
mut prefix_sets: TriePrefixSets,
collect_branch_node_masks: bool,
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
proof_cursor_metrics: &mut ProofTaskCursorMetricsCache,
) -> Result<ProofResult, ParallelStateRootError>
) -> Result<(ProofResult, Duration), ParallelStateRootError>
where
Provider: TrieCursorFactory + HashedCursorFactory,
{
@@ -1293,28 +1339,27 @@ where
cached_storage_roots: &self.cached_storage_roots,
};
let mut storage_wait_time = Duration::ZERO;
let result = build_account_multiproof_with_storage_roots(
&proof_tx.provider,
provider,
ctx,
&mut tracker,
proof_cursor_metrics,
);
&mut storage_wait_time,
)?;
let stats = tracker.finish();
result.map(|proof| ProofResult::Legacy(proof, stats))
Ok((ProofResult::Legacy(result, stats), storage_wait_time))
}
fn compute_v2_account_multiproof<Provider>(
fn compute_v2_account_multiproof<'a, Provider>(
&self,
v2_calculator: &mut proof_v2::ProofCalculator<
<Provider as TrieCursorFactory>::AccountTrieCursor<'_>,
<Provider as HashedCursorFactory>::AccountCursor<'_>,
AsyncAccountValueEncoder,
>,
v2_account_calculator: &mut V2AccountProofCalculator<'a, Provider>,
v2_storage_calculator: Rc<RefCell<V2StorageProofCalculator<'a, Provider>>>,
targets: MultiProofTargetsV2,
) -> Result<ProofResult, ParallelStateRootError>
) -> Result<(ProofResult, ValueEncoderStats), ParallelStateRootError>
where
Provider: TrieCursorFactory + HashedCursorFactory,
Provider: TrieCursorFactory + HashedCursorFactory + 'a,
{
let MultiProofTargetsV2 { mut account_targets, storage_targets } = targets;
@@ -1333,64 +1378,75 @@ where
dispatch_v2_storage_proofs(&self.storage_work_tx, &account_targets, storage_targets)?;
let mut value_encoder = AsyncAccountValueEncoder::new(
self.storage_work_tx.clone(),
storage_proof_receivers,
self.cached_storage_roots.clone(),
v2_storage_calculator,
);
let proof = DecodedMultiProofV2 {
account_proofs: v2_calculator.proof(&mut value_encoder, &mut account_targets)?,
storage_proofs: value_encoder.into_storage_proofs()?,
};
let account_proofs =
v2_account_calculator.proof(&mut value_encoder, &mut account_targets)?;
Ok(ProofResult::V2(proof))
let (storage_proofs, value_encoder_stats) = value_encoder.finalize()?;
let proof = DecodedMultiProofV2 { account_proofs, storage_proofs };
Ok((ProofResult::V2(proof), value_encoder_stats))
}
/// Processes an account multiproof request.
fn process_account_multiproof<Provider>(
///
/// Returns stats from the value encoder used during proof computation.
fn process_account_multiproof<'a, Provider>(
&self,
proof_tx: &ProofTaskTx<Provider>,
v2_calculator: Option<
&mut proof_v2::ProofCalculator<
<Provider as TrieCursorFactory>::AccountTrieCursor<'_>,
<Provider as HashedCursorFactory>::AccountCursor<'_>,
AsyncAccountValueEncoder,
>,
>,
provider: &Provider,
v2_account_calculator: Option<&mut V2AccountProofCalculator<'a, Provider>>,
v2_storage_calculator: Option<Rc<RefCell<V2StorageProofCalculator<'a, Provider>>>>,
input: AccountMultiproofInput,
account_proofs_processed: &mut u64,
cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
) where
Provider: TrieCursorFactory + HashedCursorFactory,
) -> ValueEncoderStats
where
Provider: TrieCursorFactory + HashedCursorFactory + 'a,
{
let mut proof_cursor_metrics = ProofTaskCursorMetricsCache::default();
let proof_start = Instant::now();
let (proof_result_sender, result) = match input {
let (proof_result_sender, result, value_encoder_stats) = match input {
AccountMultiproofInput::Legacy {
targets,
prefix_sets,
collect_branch_node_masks,
multi_added_removed_keys,
proof_result_sender,
} => (
proof_result_sender,
self.compute_legacy_account_multiproof(
proof_tx,
} => {
let (result, value_encoder_stats) = match self.compute_legacy_account_multiproof(
provider,
targets,
prefix_sets,
collect_branch_node_masks,
multi_added_removed_keys,
&mut proof_cursor_metrics,
),
),
AccountMultiproofInput::V2 { targets, proof_result_sender } => (
proof_result_sender,
self.compute_v2_account_multiproof::<Provider>(
v2_calculator.expect("v2 calculator provided"),
targets,
),
),
) {
Ok((proof, wait_time)) => (
Ok(proof),
ValueEncoderStats { storage_wait_time: wait_time, ..Default::default() },
),
Err(e) => (Err(e), ValueEncoderStats::default()),
};
(proof_result_sender, result, value_encoder_stats)
}
AccountMultiproofInput::V2 { targets, proof_result_sender } => {
let (result, value_encoder_stats) = match self
.compute_v2_account_multiproof::<Provider>(
v2_account_calculator.expect("v2 account calculator provided"),
v2_storage_calculator.expect("v2 storage calculator provided"),
targets,
) {
Ok((proof, stats)) => (Ok(proof), stats),
Err(e) => (Err(e), ValueEncoderStats::default()),
};
(proof_result_sender, result, value_encoder_stats)
}
};
let ProofResultContext {
@@ -1443,12 +1499,14 @@ where
#[cfg(feature = "metrics")]
// Accumulate per-proof metrics into the worker's cache
cursor_metrics_cache.extend(&proof_cursor_metrics);
value_encoder_stats
}
/// Processes a blinded account node lookup request.
fn process_blinded_node<Provider>(
worker_id: usize,
proof_tx: &ProofTaskTx<Provider>,
provider: &Provider,
path: Nibbles,
result_sender: Sender<TrieNodeProviderResult>,
account_nodes_processed: &mut u64,
@@ -1469,7 +1527,8 @@ where
);
let start = Instant::now();
let result = proof_tx.process_blinded_account_node(&path);
let account_node_provider = ProofBlindedAccountProvider::new(provider, provider);
let result = account_node_provider.trie_node(&path);
let elapsed = start.elapsed();
*account_nodes_processed += 1;
@@ -1500,11 +1559,13 @@ where
/// enabling interleaved parallelism between account trie traversal and storage proof computation.
///
/// Returns a `DecodedMultiProof` containing the account subtree and storage proofs.
/// Also accumulates the time spent waiting for storage proofs into `storage_wait_time`.
fn build_account_multiproof_with_storage_roots<P>(
provider: &P,
ctx: AccountMultiproofParams<'_>,
tracker: &mut ParallelTrieTracker,
proof_cursor_metrics: &mut ProofTaskCursorMetricsCache,
storage_wait_time: &mut Duration,
) -> Result<DecodedMultiProof, ParallelStateRootError>
where
P: TrieCursorFactory + HashedCursorFactory,
@@ -1568,6 +1629,7 @@ where
);
// Block on this specific storage proof receiver - enables interleaved
// parallelism
let wait_start = Instant::now();
let proof_msg = receiver.recv().map_err(|_| {
ParallelStateRootError::StorageRoot(
reth_execution_errors::StorageRootError::Database(
@@ -1577,6 +1639,7 @@ where
),
)
})?;
*storage_wait_time += wait_start.elapsed();
drop(_guard);
@@ -1668,7 +1731,9 @@ where
// Consume remaining storage proof receivers for accounts not encountered during trie walk.
// Done last to allow storage workers more time to complete while we finalized the account trie.
for (hashed_address, receiver) in storage_proof_receivers {
let wait_start = Instant::now();
if let Ok(proof_msg) = receiver.recv() {
*storage_wait_time += wait_start.elapsed();
let proof_result = proof_msg.result?;
let proof = Into::<Option<DecodedStorageMultiProof>>::into(proof_result)
.expect("Partial proofs are not yet supported");

View File

@@ -1,9 +1,11 @@
use crate::value_encoder::ValueEncoderStats;
use reth_metrics::{metrics::Histogram, Metrics};
use reth_trie::{
hashed_cursor::{HashedCursorMetrics, HashedCursorMetricsCache},
trie_cursor::{TrieCursorMetrics, TrieCursorMetricsCache},
TrieType,
};
use std::time::Duration;
/// Metrics for the proof task.
#[derive(Clone, Metrics)]
@@ -13,6 +15,17 @@ pub struct ProofTaskTrieMetrics {
blinded_account_nodes: Histogram,
/// A histogram for the number of blinded storage nodes fetched.
blinded_storage_nodes: Histogram,
/// Histogram for storage worker idle time in seconds (waiting for proof jobs).
storage_worker_idle_time_seconds: Histogram,
/// Histogram for account worker idle time in seconds (waiting for proof jobs + storage
/// results).
account_worker_idle_time_seconds: Histogram,
/// Histogram for `Dispatched` deferred encoder variant count.
deferred_encoder_dispatched: Histogram,
/// Histogram for `FromCache` deferred encoder variant count.
deferred_encoder_from_cache: Histogram,
/// Histogram for `Sync` deferred encoder variant count.
deferred_encoder_sync: Histogram,
}
impl ProofTaskTrieMetrics {
@@ -25,6 +38,23 @@ impl ProofTaskTrieMetrics {
pub fn record_storage_nodes(&self, count: usize) {
self.blinded_storage_nodes.record(count as f64);
}
/// Record storage worker idle time.
pub fn record_storage_worker_idle_time(&self, duration: Duration) {
self.storage_worker_idle_time_seconds.record(duration.as_secs_f64());
}
/// Record account worker idle time.
pub fn record_account_worker_idle_time(&self, duration: Duration) {
self.account_worker_idle_time_seconds.record(duration.as_secs_f64());
}
/// Record value encoder stats (deferred encoder variant counts).
pub(crate) fn record_value_encoder_stats(&self, stats: &ValueEncoderStats) {
self.deferred_encoder_dispatched.record(stats.dispatched_count as f64);
self.deferred_encoder_from_cache.record(stats.from_cache_count as f64);
self.deferred_encoder_sync.record(stats.sync_count as f64);
}
}
/// Cursor metrics for proof task operations.

View File

@@ -1,36 +1,79 @@
use crate::proof_task::{
StorageProofInput, StorageProofResult, StorageProofResultMessage, StorageWorkerJob,
};
use crate::proof_task::{StorageProofResult, StorageProofResultMessage};
use alloy_primitives::{map::B256Map, B256};
use alloy_rlp::Encodable;
use core::cell::RefCell;
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use crossbeam_channel::Receiver as CrossbeamReceiver;
use dashmap::DashMap;
use reth_execution_errors::trie::StateProofError;
use reth_primitives_traits::Account;
use reth_storage_errors::db::DatabaseError;
use reth_trie::{
proof_v2::{DeferredValueEncoder, LeafValueEncoder, Target},
hashed_cursor::HashedStorageCursor,
proof_v2::{DeferredValueEncoder, LeafValueEncoder, StorageProofCalculator},
trie_cursor::TrieStorageCursor,
ProofTrieNode,
};
use std::{rc::Rc, sync::Arc};
use std::{
rc::Rc,
sync::Arc,
time::{Duration, Instant},
};
/// Stats collected by [`AsyncAccountValueEncoder`] during proof computation.
///
/// Tracks time spent waiting for storage proofs and counts of each deferred encoder variant used.
#[derive(Debug, Default, Clone, Copy)]
pub(crate) struct ValueEncoderStats {
/// Accumulated time spent waiting for storage proof results from dispatched workers.
pub(crate) storage_wait_time: Duration,
/// Number of times the `Dispatched` variant was used (proof pre-dispatched to workers).
pub(crate) dispatched_count: u64,
/// Number of times the `FromCache` variant was used (storage root already cached).
pub(crate) from_cache_count: u64,
/// Number of times the `Sync` variant was used (synchronous computation).
pub(crate) sync_count: u64,
}
impl ValueEncoderStats {
/// Extends this metrics by adding the values from another.
pub(crate) fn extend(&mut self, other: &Self) {
self.storage_wait_time += other.storage_wait_time;
self.dispatched_count += other.dispatched_count;
self.from_cache_count += other.from_cache_count;
self.sync_count += other.sync_count;
}
}
/// Returned from [`AsyncAccountValueEncoder`], used to track an async storage root calculation.
pub(crate) enum AsyncAccountDeferredValueEncoder {
pub(crate) enum AsyncAccountDeferredValueEncoder<TC, HC> {
/// A storage proof job was dispatched to the worker pool.
Dispatched {
hashed_address: B256,
account: Account,
proof_result_rx: Result<CrossbeamReceiver<StorageProofResultMessage>, DatabaseError>,
// None if results shouldn't be retained for this dispatched proof.
storage_proof_results: Option<Rc<RefCell<B256Map<Vec<ProofTrieNode>>>>>,
/// Shared storage proof results.
storage_proof_results: Rc<RefCell<B256Map<Vec<ProofTrieNode>>>>,
/// Shared stats for tracking wait time and counts.
stats: Rc<RefCell<ValueEncoderStats>>,
},
FromCache {
/// The storage root was found in cache.
FromCache { account: Account, root: B256 },
/// Synchronous storage root computation.
Sync {
/// Shared storage proof calculator for computing storage roots.
storage_calculator: Rc<RefCell<StorageProofCalculator<TC, HC>>>,
hashed_address: B256,
account: Account,
root: B256,
/// Cache to store computed storage roots for future reuse.
cached_storage_roots: Arc<DashMap<B256, B256>>,
},
}
impl DeferredValueEncoder for AsyncAccountDeferredValueEncoder {
impl<TC, HC> DeferredValueEncoder for AsyncAccountDeferredValueEncoder<TC, HC>
where
TC: TrieStorageCursor,
HC: HashedStorageCursor<Value = alloy_primitives::U256>,
{
fn encode(self, buf: &mut Vec<u8>) -> Result<(), StateProofError> {
let (account, root) = match self {
Self::Dispatched {
@@ -38,7 +81,9 @@ impl DeferredValueEncoder for AsyncAccountDeferredValueEncoder {
account,
proof_result_rx,
storage_proof_results,
stats,
} => {
let wait_start = Instant::now();
let result = proof_result_rx?
.recv()
.map_err(|_| {
@@ -47,18 +92,27 @@ impl DeferredValueEncoder for AsyncAccountDeferredValueEncoder {
)))
})?
.result?;
stats.borrow_mut().storage_wait_time += wait_start.elapsed();
let StorageProofResult::V2 { root: Some(root), proof } = result else {
panic!("StorageProofResult is not V2 with root: {result:?}")
};
if let Some(storage_proof_results) = storage_proof_results.as_ref() {
storage_proof_results.borrow_mut().insert(hashed_address, proof);
}
storage_proof_results.borrow_mut().insert(hashed_address, proof);
(account, root)
}
Self::FromCache { account, root } => (account, root),
Self::Sync { storage_calculator, hashed_address, account, cached_storage_roots } => {
let mut calculator = storage_calculator.borrow_mut();
let proof = calculator.storage_proof(hashed_address, &mut [B256::ZERO.into()])?;
let storage_root = calculator
.compute_root_hash(&proof)?
.expect("storage_proof with dummy target always returns root");
cached_storage_roots.insert(hashed_address, storage_root);
(account, storage_root)
}
};
let account = account.into_trie_account(root);
@@ -67,12 +121,15 @@ impl DeferredValueEncoder for AsyncAccountDeferredValueEncoder {
}
}
/// Implements the [`LeafValueEncoder`] trait for accounts using a [`CrossbeamSender`] to dispatch
/// and compute storage roots asynchronously. Can also accept a set of already dispatched account
/// storage proofs, for cases where it's possible to determine some necessary accounts ahead of
/// time.
pub(crate) struct AsyncAccountValueEncoder {
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
/// Implements the [`LeafValueEncoder`] trait for accounts.
///
/// Accepts a set of pre-dispatched storage proof receivers for accounts whose storage roots are
/// being computed asynchronously by worker threads.
///
/// For accounts without pre-dispatched proofs or cached roots, uses a shared
/// [`StorageProofCalculator`] to compute storage roots synchronously, reusing cursors across
/// multiple accounts.
pub(crate) struct AsyncAccountValueEncoder<TC, HC> {
/// Storage proof jobs which were dispatched ahead of time.
dispatched: B256Map<CrossbeamReceiver<StorageProofResultMessage>>,
/// Storage roots which have already been computed. This can be used only if a storage proof
@@ -81,39 +138,59 @@ pub(crate) struct AsyncAccountValueEncoder {
/// Tracks storage proof results received from the storage workers. [`Rc`] + [`RefCell`] is
/// required because [`DeferredValueEncoder`] cannot have a lifetime.
storage_proof_results: Rc<RefCell<B256Map<Vec<ProofTrieNode>>>>,
/// Shared storage proof calculator for synchronous computation. Reuses cursors and internal
/// buffers across multiple storage root calculations.
storage_calculator: Rc<RefCell<StorageProofCalculator<TC, HC>>>,
/// Shared stats for tracking wait time and variant counts.
stats: Rc<RefCell<ValueEncoderStats>>,
}
impl AsyncAccountValueEncoder {
/// Initializes a [`Self`] using a `ProofWorkerHandle` which will be used to calculate storage
/// roots asynchronously.
impl<TC, HC> AsyncAccountValueEncoder<TC, HC> {
/// Initializes a [`Self`] using a storage proof calculator which will be reused to calculate
/// storage roots synchronously.
///
/// # Parameters
/// - `dispatched`: Pre-dispatched storage proof receivers for target accounts
/// - `cached_storage_roots`: Shared cache of already-computed storage roots
/// - `storage_calculator`: Shared storage proof calculator for synchronous computation
pub(crate) fn new(
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
dispatched: B256Map<CrossbeamReceiver<StorageProofResultMessage>>,
cached_storage_roots: Arc<DashMap<B256, B256>>,
storage_calculator: Rc<RefCell<StorageProofCalculator<TC, HC>>>,
) -> Self {
Self {
storage_work_tx,
dispatched,
cached_storage_roots,
storage_proof_results: Default::default(),
storage_calculator,
stats: Default::default(),
}
}
/// Consume [`Self`] and return all collected storage proofs which had been dispatched.
/// Consume [`Self`] and return all collected storage proofs along with accumulated stats.
///
/// This method collects any remaining dispatched proofs that weren't consumed during proof
/// calculation and includes their wait time in the returned stats.
///
/// # Panics
///
/// This method panics if any deferred encoders produced by [`Self::deferred_encoder`] have not
/// been dropped.
pub(crate) fn into_storage_proofs(
pub(crate) fn finalize(
self,
) -> Result<B256Map<Vec<ProofTrieNode>>, StateProofError> {
) -> Result<(B256Map<Vec<ProofTrieNode>>, ValueEncoderStats), StateProofError> {
let mut storage_proof_results = Rc::into_inner(self.storage_proof_results)
.expect("no deferred encoders are still allocated")
.into_inner();
// Any remaining dispatched proofs need to have their results collected
let mut stats = Rc::into_inner(self.stats)
.expect("no deferred encoders are still allocated")
.into_inner();
// Any remaining dispatched proofs need to have their results collected.
// These are proofs that were pre-dispatched but not consumed during proof calculation.
for (hashed_address, rx) in &self.dispatched {
let wait_start = Instant::now();
let result = rx
.recv()
.map_err(|_| {
@@ -122,6 +199,7 @@ impl AsyncAccountValueEncoder {
)))
})?
.result?;
stats.storage_wait_time += wait_start.elapsed();
let StorageProofResult::V2 { proof, .. } = result else {
panic!("StorageProofResult is not V2: {result:?}")
@@ -130,13 +208,17 @@ impl AsyncAccountValueEncoder {
storage_proof_results.insert(*hashed_address, proof);
}
Ok(storage_proof_results)
Ok((storage_proof_results, stats))
}
}
impl LeafValueEncoder for AsyncAccountValueEncoder {
impl<TC, HC> LeafValueEncoder for AsyncAccountValueEncoder<TC, HC>
where
TC: TrieStorageCursor,
HC: HashedStorageCursor<Value = alloy_primitives::U256>,
{
type Value = Account;
type DeferredEncoder = AsyncAccountDeferredValueEncoder;
type DeferredEncoder = AsyncAccountDeferredValueEncoder<TC, HC>;
fn deferred_encoder(
&mut self,
@@ -146,11 +228,13 @@ impl LeafValueEncoder for AsyncAccountValueEncoder {
// If the proof job has already been dispatched for this account then it's not necessary to
// dispatch another.
if let Some(rx) = self.dispatched.remove(&hashed_address) {
self.stats.borrow_mut().dispatched_count += 1;
return AsyncAccountDeferredValueEncoder::Dispatched {
hashed_address,
account,
proof_result_rx: Ok(rx),
storage_proof_results: Some(self.storage_proof_results.clone()),
storage_proof_results: self.storage_proof_results.clone(),
stats: self.stats.clone(),
}
}
@@ -159,25 +243,17 @@ impl LeafValueEncoder for AsyncAccountValueEncoder {
// If the root is already calculated then just use it directly
if let Some(root) = self.cached_storage_roots.get(&hashed_address) {
self.stats.borrow_mut().from_cache_count += 1;
return AsyncAccountDeferredValueEncoder::FromCache { account, root: *root }
}
// Create a proof input which targets a bogus key, so that we calculate the root as a
// side-effect.
let input = StorageProofInput::new(hashed_address, vec![Target::new(B256::ZERO)]);
let (tx, rx) = crossbeam_channel::bounded(1);
let proof_result_rx = self
.storage_work_tx
.send(StorageWorkerJob::StorageProof { input, proof_result_sender: tx })
.map_err(|_| DatabaseError::Other("storage workers unavailable".to_string()))
.map(|_| rx);
AsyncAccountDeferredValueEncoder::Dispatched {
// Compute storage root synchronously using the shared calculator
self.stats.borrow_mut().sync_count += 1;
AsyncAccountDeferredValueEncoder::Sync {
storage_calculator: self.storage_calculator.clone(),
hashed_address,
account,
proof_result_rx,
storage_proof_results: None,
cached_storage_roots: self.cached_storage_roots.clone(),
}
}
}

View File

@@ -109,39 +109,20 @@ where
T: TrieCursorFactory,
H: HashedCursorFactory,
{
// Synchronously computes the storage root for this account and RLP-encodes the resulting
// `TrieAccount` into `buf`
fn encode(self, buf: &mut Vec<u8>) -> Result<(), StateProofError> {
// Create cursors for storage proof calculation
let trie_cursor = self.trie_cursor_factory.storage_trie_cursor(self.hashed_address)?;
let hashed_cursor =
self.hashed_cursor_factory.hashed_storage_cursor(self.hashed_address)?;
// Create storage proof calculator with StorageValueEncoder
let mut storage_proof_calculator = ProofCalculator::new_storage(trie_cursor, hashed_cursor);
// Compute storage root by calling storage_proof with the root path as a target.
// This returns just the root node of the storage trie.
let proof = storage_proof_calculator
.storage_proof(self.hashed_address, &mut [B256::ZERO.into()])?;
let storage_root = storage_proof_calculator
.storage_proof(self.hashed_address, &mut [B256::ZERO.into()])
.map(|nodes| {
// Encode the root node to RLP and hash it
let root_node =
nodes.first().expect("storage_proof always returns at least the root");
root_node.node.encode(buf);
.compute_root_hash(&proof)?
.expect("storage_proof with dummy target always returns root");
let storage_root = alloy_primitives::keccak256(buf.as_slice());
// Clear the buffer so we can re-use it to encode the TrieAccount
buf.clear();
storage_root
})?;
// Combine account with storage root to create TrieAccount
let trie_account = self.account.into_trie_account(storage_root);
// Encode the trie account
trie_account.encode(buf);
Ok(())