mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
2 Commits
devnet4
...
matt/instr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8cfcd6eae0 | ||
|
|
dead6127eb |
@@ -54,7 +54,7 @@ use std::{
|
||||
},
|
||||
time::Instant,
|
||||
};
|
||||
use tracing::{debug, debug_span, instrument, warn, Span};
|
||||
use tracing::{debug, debug_span, info, instrument, warn, Span};
|
||||
|
||||
pub mod bal;
|
||||
mod configured_sparse_trie;
|
||||
@@ -269,6 +269,7 @@ where
|
||||
None,
|
||||
)
|
||||
};
|
||||
info!("Spawned prewarm");
|
||||
|
||||
// Create and spawn the storage proof task
|
||||
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
|
||||
@@ -294,7 +295,9 @@ where
|
||||
// spawn multi-proof task
|
||||
let parent_span = span.clone();
|
||||
let saved_cache = prewarm_handle.saved_cache.clone();
|
||||
info!("Spawning multiproof");
|
||||
self.executor.spawn_blocking(move || {
|
||||
info!("spawned MultiProofTask");
|
||||
let _enter = parent_span.entered();
|
||||
// Build a state provider for the multiproof task
|
||||
let provider = provider_builder.build().expect("failed to build provider");
|
||||
|
||||
@@ -21,7 +21,7 @@ use reth_trie_parallel::{
|
||||
},
|
||||
};
|
||||
use std::{collections::BTreeMap, sync::Arc, time::Instant};
|
||||
use tracing::{debug, error, instrument, trace};
|
||||
use tracing::{debug, error, info, instrument, trace};
|
||||
|
||||
/// Source of state changes, either from EVM execution or from a Block Access List.
|
||||
#[derive(Clone, Copy)]
|
||||
@@ -118,6 +118,19 @@ pub(super) enum MultiProofMessage {
|
||||
FinishedStateUpdates,
|
||||
}
|
||||
|
||||
impl MultiProofMessage {
|
||||
|
||||
pub fn kind(&self) -> &'static str {
|
||||
match self {
|
||||
MultiProofMessage::PrefetchProofs(_) => "PrefetchProofs",
|
||||
MultiProofMessage::StateUpdate(_, _) => "StateUpdate",
|
||||
MultiProofMessage::EmptyProof { .. } => "EmptyProof",
|
||||
MultiProofMessage::BlockAccessList(_) => "BlockAccessList",
|
||||
MultiProofMessage::FinishedStateUpdates => "FinishedStateUpdates",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle to track proof calculation ordering.
|
||||
#[derive(Debug, Default)]
|
||||
struct ProofSequencer {
|
||||
@@ -679,8 +692,11 @@ impl MultiProofTask {
|
||||
fields(accounts = update.len(), chunks = 0)
|
||||
)]
|
||||
fn on_state_update(&mut self, source: Source, update: EvmState) -> u64 {
|
||||
let now = Instant::now();
|
||||
let hashed_state_update = evm_state_to_hashed_post_state(update);
|
||||
self.on_hashed_state_update(source, hashed_state_update)
|
||||
let res = self.on_hashed_state_update(source, hashed_state_update);
|
||||
info!("on_state_update took {:?}", now.elapsed());
|
||||
res
|
||||
}
|
||||
|
||||
/// Processes a hashed state update and dispatches multiproofs as needed.
|
||||
@@ -821,6 +837,7 @@ impl MultiProofTask {
|
||||
Ok(MultiProofMessage::PrefetchProofs(next_targets)) => {
|
||||
let next_count = next_targets.chunking_length();
|
||||
if accumulated_count + next_count > PREFETCH_MAX_BATCH_TARGETS {
|
||||
info!("exceeded chunk size");
|
||||
ctx.pending_msg =
|
||||
Some(MultiProofMessage::PrefetchProofs(next_targets));
|
||||
break;
|
||||
@@ -839,16 +856,21 @@ impl MultiProofTask {
|
||||
}
|
||||
}
|
||||
Ok(other_msg) => {
|
||||
info!("interleaved mutltiproof message type {}", other_msg.kind());
|
||||
ctx.pending_msg = Some(other_msg);
|
||||
break;
|
||||
}
|
||||
Err(_) => break,
|
||||
Err(_) => {
|
||||
info!("no more messages");
|
||||
break
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Process all accumulated messages in a single batch
|
||||
let num_batched = ctx.accumulated_prefetch_targets.len();
|
||||
self.metrics.prefetch_batch_size_histogram.record(num_batched as f64);
|
||||
info!("batched {num_batched} multiproofs");
|
||||
|
||||
// Merge all accumulated prefetch targets into a single dispatch payload.
|
||||
// Use drain to preserve the buffer allocation.
|
||||
@@ -977,6 +999,7 @@ impl MultiProofTask {
|
||||
sequence_number,
|
||||
SparseTrieUpdate { state, multiproof: Default::default() },
|
||||
) {
|
||||
info!("Send combined update");
|
||||
let _ = self.to_sparse_trie.send(combined_update);
|
||||
}
|
||||
|
||||
@@ -1042,6 +1065,7 @@ impl MultiProofTask {
|
||||
where
|
||||
P: AccountReader,
|
||||
{
|
||||
info!("MultiProofTask::run");
|
||||
let mut ctx = MultiproofBatchCtx::new(Instant::now());
|
||||
let mut batch_metrics = MultiproofBatchMetrics::default();
|
||||
|
||||
@@ -1057,6 +1081,9 @@ impl MultiProofTask {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut now = Instant::now();
|
||||
let mut proof_now = Instant::now();
|
||||
|
||||
// Use select_biased! to prioritize proof results over new requests.
|
||||
// This prevents new work from starving completed proofs and keeps workers healthy.
|
||||
crossbeam_channel::select_biased! {
|
||||
@@ -1071,6 +1098,9 @@ impl MultiProofTask {
|
||||
|
||||
self.multiproof_manager.on_calculation_complete();
|
||||
|
||||
info!("received new multiproof result after {:?}, took {:?}", proof_now.elapsed(), proof_result.elapsed);
|
||||
proof_now = Instant::now();
|
||||
|
||||
// Convert ProofResultMessage to SparseTrieUpdate
|
||||
match proof_result.result {
|
||||
Ok(proof_result_data) => {
|
||||
@@ -1089,6 +1119,7 @@ impl MultiProofTask {
|
||||
if let Some(combined_update) =
|
||||
self.on_proof(proof_result.sequence_number, update)
|
||||
{
|
||||
info!("dispatch proof result combined update");
|
||||
let _ = self.to_sparse_trie.send(combined_update);
|
||||
}
|
||||
}
|
||||
@@ -1125,7 +1156,9 @@ impl MultiProofTask {
|
||||
return
|
||||
}
|
||||
};
|
||||
|
||||
let elapsed = now.elapsed();
|
||||
info!("received new multiproof message {} after {:?}", msg.kind(), elapsed);
|
||||
now = Instant::now();
|
||||
if self.process_multiproof_message(msg, &mut ctx, &mut batch_metrics, &provider) {
|
||||
break 'main;
|
||||
}
|
||||
|
||||
@@ -45,7 +45,7 @@ use std::{
|
||||
},
|
||||
time::Instant,
|
||||
};
|
||||
use tracing::{debug, debug_span, instrument, trace, warn, Span};
|
||||
use tracing::{debug, debug_span, error, info, instrument, trace, warn, Span};
|
||||
|
||||
/// Determines the prewarming mode: transaction-based or BAL-based.
|
||||
pub(super) enum PrewarmMode<Tx> {
|
||||
@@ -396,6 +396,7 @@ where
|
||||
) where
|
||||
Tx: ExecutableTxFor<Evm> + Clone + Send + 'static,
|
||||
{
|
||||
info!("Running Prewarm task");
|
||||
// Spawn execution tasks based on mode
|
||||
match mode {
|
||||
PrewarmMode::Transactions(pending) => {
|
||||
@@ -408,14 +409,21 @@ where
|
||||
|
||||
let mut final_execution_outcome = None;
|
||||
let mut finished_execution = false;
|
||||
let mut now = Instant::now();
|
||||
let mut start = Instant::now();
|
||||
while let Ok(event) = self.actions_rx.recv() {
|
||||
match event {
|
||||
PrewarmTaskEvent::TerminateTransactionExecution => {
|
||||
let elapsed = start.elapsed();
|
||||
error!("terminate prewarm execution after {:?}", elapsed);
|
||||
// stop tx processing
|
||||
debug!(target: "engine::tree::prewarm", "Terminating prewarm execution");
|
||||
self.ctx.terminate_execution.store(true, Ordering::Relaxed);
|
||||
}
|
||||
PrewarmTaskEvent::Outcome { proof_targets } => {
|
||||
let elapsed = now.elapsed();
|
||||
info!("received new prewarm outcome after {:?}", elapsed);
|
||||
now = Instant::now();
|
||||
// completed executing a set of transactions
|
||||
self.send_multi_proof_targets(proof_targets);
|
||||
}
|
||||
@@ -429,6 +437,8 @@ where
|
||||
}
|
||||
}
|
||||
PrewarmTaskEvent::FinishedTxExecution { executed_transactions } => {
|
||||
let elapsed = start.elapsed();
|
||||
error!("finished prewarm execution after {:?}", elapsed);
|
||||
trace!(target: "engine::tree::payload_processor::prewarm", "Finished prewarm execution signal");
|
||||
self.ctx.metrics.transactions.set(executed_transactions as f64);
|
||||
self.ctx.metrics.transactions_histogram.record(executed_transactions as f64);
|
||||
|
||||
Reference in New Issue
Block a user