feat(stages): always report entities processed & total metrics (#2824)

This commit is contained in:
Alexey Shekhirin
2023-05-26 06:02:27 +04:00
committed by GitHub
parent cb829be089
commit 46e95337db
2 changed files with 32 additions and 15 deletions

View File

@@ -139,6 +139,7 @@ where
.view(|tx| stage_id.get_checkpoint(tx).ok().flatten().unwrap_or_default())
.ok()
.unwrap_or_default(),
None,
);
}
}
@@ -265,7 +266,13 @@ where
match output {
Ok(unwind_output) => {
stage_progress = unwind_output.checkpoint;
self.metrics.stage_checkpoint(stage_id, stage_progress);
self.metrics.stage_checkpoint(
stage_id,
stage_progress,
// We assume it was set in the previous execute iteration, so it
// doesn't change when we unwind.
None,
);
stage_id.save_checkpoint(tx.deref(), stage_progress)?;
self.listeners
@@ -323,7 +330,8 @@ where
.await
{
Ok(out @ ExecOutput { checkpoint, done }) => {
made_progress |= checkpoint != prev_checkpoint.unwrap_or_default();
made_progress |=
checkpoint.block_number != prev_checkpoint.unwrap_or_default().block_number;
info!(
target: "sync::pipeline",
stage = %stage_id,
@@ -332,7 +340,11 @@ where
%done,
"Stage made progress"
);
self.metrics.stage_checkpoint(stage_id, checkpoint);
self.metrics.stage_checkpoint(
stage_id,
checkpoint,
previous_stage.map(|(_, checkpoint)| checkpoint.block_number),
);
stage_id.save_checkpoint(tx.deref(), checkpoint)?;
self.listeners.notify(PipelineEvent::Ran { stage_id, result: out.clone() });

View File

@@ -1,7 +1,7 @@
use crate::StageId;
use metrics::Gauge;
use reth_metrics_derive::Metrics;
use reth_primitives::{EntitiesCheckpoint, StageCheckpoint, StageUnitCheckpoint};
use reth_primitives::{BlockNumber, EntitiesCheckpoint, StageCheckpoint, StageUnitCheckpoint};
use std::collections::HashMap;
#[derive(Metrics)]
@@ -17,28 +17,33 @@ pub(crate) struct StageMetrics {
#[derive(Default)]
pub(crate) struct Metrics {
checkpoints: HashMap<StageId, StageMetrics>,
stages: HashMap<StageId, StageMetrics>,
}
impl Metrics {
pub(crate) fn stage_checkpoint(&mut self, stage_id: StageId, checkpoint: StageCheckpoint) {
pub(crate) fn stage_checkpoint(
&mut self,
stage_id: StageId,
checkpoint: StageCheckpoint,
max_block_number: Option<BlockNumber>,
) {
let stage_metrics = self
.checkpoints
.stages
.entry(stage_id)
.or_insert_with(|| StageMetrics::new_with_labels(&[("stage", stage_id.to_string())]));
stage_metrics.checkpoint.set(checkpoint.block_number as f64);
#[allow(clippy::single_match)]
match checkpoint.stage_checkpoint {
let (processed, total) = match checkpoint.stage_checkpoint {
Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { processed, total })) => {
stage_metrics.entities_processed.set(processed as f64);
if let Some(total) = total {
stage_metrics.entities_total.set(total as f64);
}
(processed, total)
}
_ => (),
_ => (checkpoint.block_number, max_block_number),
};
stage_metrics.entities_processed.set(processed as f64);
if let Some(total) = total {
stage_metrics.entities_total.set(total as f64);
}
}
}