Compare commits

...

2 Commits

Author SHA1 Message Date
Matthias Seitz
8cfcd6eae0 chore: insturmentation 2026-01-09 19:02:33 +01:00
Matthias Seitz
dead6127eb instrumentation 2026-01-09 17:13:42 +01:00
3 changed files with 52 additions and 6 deletions

View File

@@ -54,7 +54,7 @@ use std::{
},
time::Instant,
};
use tracing::{debug, debug_span, instrument, warn, Span};
use tracing::{debug, debug_span, info, instrument, warn, Span};
pub mod bal;
mod configured_sparse_trie;
@@ -269,6 +269,7 @@ where
None,
)
};
info!("Spawned prewarm");
// Create and spawn the storage proof task
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
@@ -294,7 +295,9 @@ where
// spawn multi-proof task
let parent_span = span.clone();
let saved_cache = prewarm_handle.saved_cache.clone();
info!("Spawning multiproof");
self.executor.spawn_blocking(move || {
info!("spawned MultiProofTask");
let _enter = parent_span.entered();
// Build a state provider for the multiproof task
let provider = provider_builder.build().expect("failed to build provider");

View File

@@ -21,7 +21,7 @@ use reth_trie_parallel::{
},
};
use std::{collections::BTreeMap, sync::Arc, time::Instant};
use tracing::{debug, error, instrument, trace};
use tracing::{debug, error, info, instrument, trace};
/// Source of state changes, either from EVM execution or from a Block Access List.
#[derive(Clone, Copy)]
@@ -118,6 +118,19 @@ pub(super) enum MultiProofMessage {
FinishedStateUpdates,
}
impl MultiProofMessage {
pub fn kind(&self) -> &'static str {
match self {
MultiProofMessage::PrefetchProofs(_) => "PrefetchProofs",
MultiProofMessage::StateUpdate(_, _) => "StateUpdate",
MultiProofMessage::EmptyProof { .. } => "EmptyProof",
MultiProofMessage::BlockAccessList(_) => "BlockAccessList",
MultiProofMessage::FinishedStateUpdates => "FinishedStateUpdates",
}
}
}
/// Handle to track proof calculation ordering.
#[derive(Debug, Default)]
struct ProofSequencer {
@@ -679,8 +692,11 @@ impl MultiProofTask {
fields(accounts = update.len(), chunks = 0)
)]
fn on_state_update(&mut self, source: Source, update: EvmState) -> u64 {
let now = Instant::now();
let hashed_state_update = evm_state_to_hashed_post_state(update);
self.on_hashed_state_update(source, hashed_state_update)
let res = self.on_hashed_state_update(source, hashed_state_update);
info!("on_state_update took {:?}", now.elapsed());
res
}
/// Processes a hashed state update and dispatches multiproofs as needed.
@@ -821,6 +837,7 @@ impl MultiProofTask {
Ok(MultiProofMessage::PrefetchProofs(next_targets)) => {
let next_count = next_targets.chunking_length();
if accumulated_count + next_count > PREFETCH_MAX_BATCH_TARGETS {
info!("exceeded chunk size");
ctx.pending_msg =
Some(MultiProofMessage::PrefetchProofs(next_targets));
break;
@@ -839,16 +856,21 @@ impl MultiProofTask {
}
}
Ok(other_msg) => {
info!("interleaved mutltiproof message type {}", other_msg.kind());
ctx.pending_msg = Some(other_msg);
break;
}
Err(_) => break,
Err(_) => {
info!("no more messages");
break
},
}
}
// Process all accumulated messages in a single batch
let num_batched = ctx.accumulated_prefetch_targets.len();
self.metrics.prefetch_batch_size_histogram.record(num_batched as f64);
info!("batched {num_batched} multiproofs");
// Merge all accumulated prefetch targets into a single dispatch payload.
// Use drain to preserve the buffer allocation.
@@ -977,6 +999,7 @@ impl MultiProofTask {
sequence_number,
SparseTrieUpdate { state, multiproof: Default::default() },
) {
info!("Send combined update");
let _ = self.to_sparse_trie.send(combined_update);
}
@@ -1042,6 +1065,7 @@ impl MultiProofTask {
where
P: AccountReader,
{
info!("MultiProofTask::run");
let mut ctx = MultiproofBatchCtx::new(Instant::now());
let mut batch_metrics = MultiproofBatchMetrics::default();
@@ -1057,6 +1081,9 @@ impl MultiProofTask {
continue;
}
let mut now = Instant::now();
let mut proof_now = Instant::now();
// Use select_biased! to prioritize proof results over new requests.
// This prevents new work from starving completed proofs and keeps workers healthy.
crossbeam_channel::select_biased! {
@@ -1071,6 +1098,9 @@ impl MultiProofTask {
self.multiproof_manager.on_calculation_complete();
info!("received new multiproof result after {:?}, took {:?}", proof_now.elapsed(), proof_result.elapsed);
proof_now = Instant::now();
// Convert ProofResultMessage to SparseTrieUpdate
match proof_result.result {
Ok(proof_result_data) => {
@@ -1089,6 +1119,7 @@ impl MultiProofTask {
if let Some(combined_update) =
self.on_proof(proof_result.sequence_number, update)
{
info!("dispatch proof result combined update");
let _ = self.to_sparse_trie.send(combined_update);
}
}
@@ -1125,7 +1156,9 @@ impl MultiProofTask {
return
}
};
let elapsed = now.elapsed();
info!("received new multiproof message {} after {:?}", msg.kind(), elapsed);
now = Instant::now();
if self.process_multiproof_message(msg, &mut ctx, &mut batch_metrics, &provider) {
break 'main;
}

View File

@@ -45,7 +45,7 @@ use std::{
},
time::Instant,
};
use tracing::{debug, debug_span, instrument, trace, warn, Span};
use tracing::{debug, debug_span, error, info, instrument, trace, warn, Span};
/// Determines the prewarming mode: transaction-based or BAL-based.
pub(super) enum PrewarmMode<Tx> {
@@ -396,6 +396,7 @@ where
) where
Tx: ExecutableTxFor<Evm> + Clone + Send + 'static,
{
info!("Running Prewarm task");
// Spawn execution tasks based on mode
match mode {
PrewarmMode::Transactions(pending) => {
@@ -408,14 +409,21 @@ where
let mut final_execution_outcome = None;
let mut finished_execution = false;
let mut now = Instant::now();
let mut start = Instant::now();
while let Ok(event) = self.actions_rx.recv() {
match event {
PrewarmTaskEvent::TerminateTransactionExecution => {
let elapsed = start.elapsed();
error!("terminate prewarm execution after {:?}", elapsed);
// stop tx processing
debug!(target: "engine::tree::prewarm", "Terminating prewarm execution");
self.ctx.terminate_execution.store(true, Ordering::Relaxed);
}
PrewarmTaskEvent::Outcome { proof_targets } => {
let elapsed = now.elapsed();
info!("received new prewarm outcome after {:?}", elapsed);
now = Instant::now();
// completed executing a set of transactions
self.send_multi_proof_targets(proof_targets);
}
@@ -429,6 +437,8 @@ where
}
}
PrewarmTaskEvent::FinishedTxExecution { executed_transactions } => {
let elapsed = start.elapsed();
error!("finished prewarm execution after {:?}", elapsed);
trace!(target: "engine::tree::payload_processor::prewarm", "Finished prewarm execution signal");
self.ctx.metrics.transactions.set(executed_transactions as f64);
self.ctx.metrics.transactions_histogram.record(executed_transactions as f64);