feat(tree): state root task multiproof metrics (#14534)

This commit is contained in:
Alexey Shekhirin
2025-02-26 12:49:04 +00:00
committed by GitHub
parent 0c96a23f8f
commit 4ab2254d08
2 changed files with 316 additions and 297 deletions

View File

@@ -50,7 +50,7 @@ const SPARSE_TRIE_INCREMENTAL_LEVEL: usize = 2;
/// - State Root Task spawned in [`StateRootTask::spawn`]
/// - It should heave at least 3 threads to be used in:
/// - Sparse Trie spawned in [`run_sparse_trie`]
/// - Multiproof computation spawned in [`MultiproofManager::spawn_multiproof`]
/// - Multiproof computation spawned in [`MultiProofManager::spawn_multiproof`]
/// - Storage root computation spawned in [`ParallelProof::multiproof`]
///
/// NOTE: this value can be greater than the available cores in the host, it
@@ -65,7 +65,7 @@ pub(crate) fn rayon_thread_pool_size() -> usize {
/// - Engine in main thread that spawns the state root task.
/// - State Root Task spawned in [`StateRootTask::spawn`]
/// - Sparse Trie spawned in [`run_sparse_trie`]
/// - Multiproof computation spawned in [`MultiproofManager::spawn_multiproof`]
/// - Multiproof computation spawned in [`MultiProofManager::spawn_multiproof`]
/// - Storage root computation spawned in [`ParallelProof::multiproof`]
pub(crate) fn has_enough_parallelism() -> bool {
std::thread::available_parallelism().is_ok_and(|num| num.get() >= 5)
@@ -334,7 +334,7 @@ fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostState {
/// Input parameters for spawning a multiproof calculation.
#[derive(Debug)]
struct MultiproofInput<Factory> {
struct MultiProofInput<Factory> {
config: StateRootConfig<Factory>,
source: Option<StateChangeSource>,
hashed_state_update: HashedPostState,
@@ -343,29 +343,43 @@ struct MultiproofInput<Factory> {
state_root_message_sender: Sender<StateRootMessage>,
}
#[derive(Metrics, Clone)]
#[metrics(scope = "tree.root")]
struct MultiProofMetrics {
/// Histogram of the number of inflight multiproofs.
pub inflight_multiproofs_histogram: Histogram,
/// Histogram of the number of pending multiproofs.
pub pending_multiproofs_histogram: Histogram,
}
/// Manages concurrent multiproof calculations.
/// Takes care of not having more calculations in flight than a given thread
/// pool size, further calculation requests are queued and spawn later, after
/// availability has been signaled.
#[derive(Debug)]
struct MultiproofManager<Factory> {
struct MultiProofManager<Factory> {
/// Maximum number of concurrent calculations.
max_concurrent: usize,
/// Currently running calculations.
inflight: usize,
/// Queued calculations.
pending: VecDeque<MultiproofInput<Factory>>,
pending: VecDeque<MultiProofInput<Factory>>,
/// Thread pool to spawn multiproof calculations.
thread_pool: Arc<rayon::ThreadPool>,
metrics: MultiProofMetrics,
}
impl<Factory> MultiproofManager<Factory>
impl<Factory> MultiProofManager<Factory>
where
Factory:
DatabaseProviderFactory<Provider: BlockReader> + StateCommitmentProvider + Clone + 'static,
{
/// Creates a new [`MultiproofManager`].
fn new(thread_pool: Arc<rayon::ThreadPool>, thread_pool_size: usize) -> Self {
/// Creates a new [`MultiProofManager`].
fn new(
thread_pool: Arc<rayon::ThreadPool>,
thread_pool_size: usize,
metrics: MultiProofMetrics,
) -> Self {
// we keep 2 threads to be used internally by [`StateRootTask`]
let max_concurrent = thread_pool_size.saturating_sub(2);
debug_assert!(max_concurrent != 0);
@@ -374,12 +388,13 @@ where
max_concurrent,
inflight: 0,
pending: VecDeque::with_capacity(max_concurrent),
metrics,
}
}
/// Spawns a new multiproof calculation or enqueues it for later if
/// `max_concurrent` are already inflight.
fn spawn_or_queue(&mut self, input: MultiproofInput<Factory>) {
fn spawn_or_queue(&mut self, input: MultiProofInput<Factory>) {
// If there are no proof targets, we can just send an empty multiproof back immediately
if input.proof_targets.is_empty() {
debug!(
@@ -395,6 +410,7 @@ where
if self.inflight >= self.max_concurrent {
self.pending.push_back(input);
self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64);
return;
}
@@ -405,8 +421,10 @@ where
/// spawn a new calculation if needed.
fn on_calculation_complete(&mut self) {
self.inflight = self.inflight.saturating_sub(1);
self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
if let Some(input) = self.pending.pop_front() {
self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64);
self.spawn_multiproof(input);
}
}
@@ -414,14 +432,14 @@ where
/// Spawns a multiproof calculation.
fn spawn_multiproof(
&mut self,
MultiproofInput {
MultiProofInput {
config,
source,
hashed_state_update,
proof_targets,
proof_sequence_number,
state_root_message_sender,
}: MultiproofInput<Factory>,
}: MultiProofInput<Factory>,
) {
let thread_pool = self.thread_pool.clone();
@@ -447,7 +465,7 @@ where
?source,
account_targets,
storage_targets,
"Multiproof calculated",
"MultiProof calculated",
);
match result {
@@ -473,6 +491,7 @@ where
});
self.inflight += 1;
self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
}
}
@@ -541,7 +560,7 @@ pub struct StateRootTask<Factory> {
/// Reference to the shared thread pool for parallel proof generation.
thread_pool: Arc<rayon::ThreadPool>,
/// Manages calculation of multiproofs.
multiproof_manager: MultiproofManager<Factory>,
multiproof_manager: MultiProofManager<Factory>,
/// State root task metrics
metrics: StateRootTaskMetrics,
}
@@ -561,7 +580,11 @@ where
fetched_proof_targets: Default::default(),
proof_sequencer: ProofSequencer::new(),
thread_pool: thread_pool.clone(),
multiproof_manager: MultiproofManager::new(thread_pool, rayon_thread_pool_size()),
multiproof_manager: MultiProofManager::new(
thread_pool,
rayon_thread_pool_size(),
MultiProofMetrics::default(),
),
metrics: StateRootTaskMetrics::default(),
}
}
@@ -686,7 +709,7 @@ where
let proof_targets = self.get_prefetch_proof_targets(targets);
extend_multi_proof_targets_ref(&mut self.fetched_proof_targets, &proof_targets);
self.multiproof_manager.spawn_or_queue(MultiproofInput {
self.multiproof_manager.spawn_or_queue(MultiProofInput {
config: self.config.clone(),
source: None,
hashed_state_update: Default::default(),
@@ -759,7 +782,7 @@ where
let proof_targets = get_proof_targets(&hashed_state_update, &self.fetched_proof_targets);
extend_multi_proof_targets_ref(&mut self.fetched_proof_targets, &proof_targets);
self.multiproof_manager.spawn_or_queue(MultiproofInput {
self.multiproof_manager.spawn_or_queue(MultiProofInput {
config: self.config.clone(),
source: Some(source),
hashed_state_update,
@@ -801,7 +824,7 @@ where
/// so that the proofs for accounts and storage slots that were already fetched are not
/// requested again.
/// 2. Using the proof targets, a new multiproof is calculated using
/// [`MultiproofManager::spawn_or_queue`].
/// [`MultiProofManager::spawn_or_queue`].
/// * If the list of proof targets is empty, the [`StateRootMessage::EmptyProof`] message is
/// sent back to this task along with the original state update.
/// * Otherwise, the multiproof is calculated and the [`StateRootMessage::ProofCalculated`]