mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-09 23:38:10 -05:00
feat(observability): add phase-level observablity to newPayload processing (#18308)
Co-authored-by: YK <chiayongkang@hotmail.com> Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -163,16 +163,22 @@ pub(crate) struct BlockValidationMetrics {
|
||||
pub(crate) state_root_storage_tries_updated_total: Counter,
|
||||
/// Total number of times the parallel state root computation fell back to regular.
|
||||
pub(crate) state_root_parallel_fallback_total: Counter,
|
||||
/// Histogram of state root duration, ie the time spent blocked waiting for the state root.
|
||||
pub(crate) state_root_histogram: Histogram,
|
||||
/// Latest state root duration, ie the time spent blocked waiting for the state root.
|
||||
pub(crate) state_root_duration: Gauge,
|
||||
/// Histogram for state root duration ie the time spent blocked waiting for the state root
|
||||
pub(crate) state_root_histogram: Histogram,
|
||||
/// Trie input computation duration
|
||||
pub(crate) trie_input_duration: Histogram,
|
||||
/// Payload conversion and validation latency
|
||||
pub(crate) payload_validation_duration: Gauge,
|
||||
/// Histogram of payload validation latency
|
||||
pub(crate) payload_validation_histogram: Histogram,
|
||||
/// Payload processor spawning duration
|
||||
pub(crate) spawn_payload_processor: Histogram,
|
||||
/// Post-execution validation duration
|
||||
pub(crate) post_execution_validation_duration: Histogram,
|
||||
/// Total duration of the new payload call
|
||||
pub(crate) total_duration: Histogram,
|
||||
}
|
||||
|
||||
impl BlockValidationMetrics {
|
||||
|
||||
@@ -508,7 +508,8 @@ where
|
||||
trace!(target: "engine::tree", "invoked new payload");
|
||||
self.metrics.engine.new_payload_messages.increment(1);
|
||||
|
||||
let validation_start = Instant::now();
|
||||
// start timing for the new payload process
|
||||
let start = Instant::now();
|
||||
|
||||
// Ensures that the given payload does not violate any consensus rules that concern the
|
||||
// block's layout, like:
|
||||
@@ -537,10 +538,6 @@ where
|
||||
// This validation **MUST** be instantly run in all cases even during active sync process.
|
||||
let parent_hash = payload.parent_hash();
|
||||
|
||||
self.metrics
|
||||
.block_validation
|
||||
.record_payload_validation(validation_start.elapsed().as_secs_f64());
|
||||
|
||||
let num_hash = payload.num_hash();
|
||||
let engine_event = ConsensusEngineEvent::BlockReceived(num_hash);
|
||||
self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
|
||||
@@ -569,6 +566,8 @@ where
|
||||
let status = self.on_invalid_new_payload(block.into_sealed_block(), invalid)?;
|
||||
return Ok(TreeOutcome::new(status))
|
||||
}
|
||||
// record pre-execution phase duration
|
||||
self.metrics.block_validation.record_payload_validation(start.elapsed().as_secs_f64());
|
||||
|
||||
let status = if self.backfill_sync_state.is_idle() {
|
||||
let mut latest_valid_hash = None;
|
||||
@@ -625,6 +624,9 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// record total newPayload duration
|
||||
self.metrics.block_validation.total_duration.record(start.elapsed().as_secs_f64());
|
||||
|
||||
Ok(outcome)
|
||||
}
|
||||
|
||||
@@ -663,7 +665,7 @@ where
|
||||
warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
|
||||
// This should never happen as we're walking back a chain that should connect to
|
||||
// the canonical chain
|
||||
return Ok(None);
|
||||
return Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -673,7 +675,7 @@ where
|
||||
new_chain.reverse();
|
||||
|
||||
// Simple extension of the current chain
|
||||
return Ok(Some(NewCanonicalChain::Commit { new: new_chain }));
|
||||
return Ok(Some(NewCanonicalChain::Commit { new: new_chain }))
|
||||
}
|
||||
|
||||
// We have a reorg. Walk back both chains to find the fork point.
|
||||
@@ -690,7 +692,7 @@ where
|
||||
} else {
|
||||
// This shouldn't happen as we're walking back the canonical chain
|
||||
warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
|
||||
return Ok(None);
|
||||
return Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -706,7 +708,7 @@ where
|
||||
} else {
|
||||
// This shouldn't happen as we're walking back the canonical chain
|
||||
warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
|
||||
return Ok(None);
|
||||
return Ok(None)
|
||||
}
|
||||
|
||||
if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
|
||||
@@ -716,7 +718,7 @@ where
|
||||
} else {
|
||||
// This shouldn't happen as we've already walked this path
|
||||
warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
|
||||
return Ok(None);
|
||||
return Ok(None)
|
||||
}
|
||||
}
|
||||
new_chain.reverse();
|
||||
|
||||
@@ -335,7 +335,9 @@ where
|
||||
Ok(val) => val,
|
||||
Err(e) => {
|
||||
let block = self.convert_to_block(input)?;
|
||||
return Err(InsertBlockError::new(block.into_sealed_block(), e.into()).into())
|
||||
return Err(
|
||||
InsertBlockError::new(block.into_sealed_block(), e.into()).into()
|
||||
)
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -437,7 +439,8 @@ where
|
||||
// Use state root task only if prefix sets are empty, otherwise proof generation is too
|
||||
// expensive because it requires walking over the paths in the prefix set in every
|
||||
// proof.
|
||||
if trie_input.prefix_sets.is_empty() {
|
||||
let spawn_payload_processor_start = Instant::now();
|
||||
let handle = if trie_input.prefix_sets.is_empty() {
|
||||
self.payload_processor.spawn(
|
||||
env.clone(),
|
||||
txs,
|
||||
@@ -450,9 +453,25 @@ where
|
||||
debug!(target: "engine::tree", block=?block_num_hash, "Disabling state root task due to non-empty prefix sets");
|
||||
use_state_root_task = false;
|
||||
self.payload_processor.spawn_cache_exclusive(env.clone(), txs, provider_builder)
|
||||
}
|
||||
};
|
||||
|
||||
// record prewarming initialization duration
|
||||
self.metrics
|
||||
.block_validation
|
||||
.spawn_payload_processor
|
||||
.record(spawn_payload_processor_start.elapsed().as_secs_f64());
|
||||
handle
|
||||
} else {
|
||||
self.payload_processor.spawn_cache_exclusive(env.clone(), txs, provider_builder)
|
||||
let prewarming_start = Instant::now();
|
||||
let handle =
|
||||
self.payload_processor.spawn_cache_exclusive(env.clone(), txs, provider_builder);
|
||||
|
||||
// Record prewarming initialization duration
|
||||
self.metrics
|
||||
.block_validation
|
||||
.spawn_payload_processor
|
||||
.record(prewarming_start.elapsed().as_secs_f64());
|
||||
handle
|
||||
};
|
||||
|
||||
// Use cached state provider before executing, used in execution after prewarming threads
|
||||
@@ -491,6 +510,7 @@ where
|
||||
};
|
||||
}
|
||||
|
||||
let post_execution_start = Instant::now();
|
||||
trace!(target: "engine::tree", block=?block_num_hash, "Validating block consensus");
|
||||
// validate block consensus rules
|
||||
ensure_ok!(self.validate_block_inner(&block));
|
||||
@@ -519,6 +539,12 @@ where
|
||||
return Err(InsertBlockError::new(block.into_sealed_block(), err.into()).into())
|
||||
}
|
||||
|
||||
// record post-execution validation duration
|
||||
self.metrics
|
||||
.block_validation
|
||||
.post_execution_validation_duration
|
||||
.record(post_execution_start.elapsed().as_secs_f64());
|
||||
|
||||
debug!(target: "engine::tree", block=?block_num_hash, "Calculating block state root");
|
||||
|
||||
let root_time = Instant::now();
|
||||
@@ -892,7 +918,7 @@ where
|
||||
) {
|
||||
if state.invalid_headers.get(&block.hash()).is_some() {
|
||||
// we already marked this block as invalid
|
||||
return;
|
||||
return
|
||||
}
|
||||
self.invalid_block_hook.on_invalid_block(parent_header, block, output, trie_updates);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user