From 46e95337db789c47aaf83cb94a6eddb38a35011c Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 26 May 2023 06:02:27 +0400 Subject: [PATCH] feat(stages): always report entities processed & total metrics (#2824) --- crates/stages/src/pipeline/mod.rs | 18 +++++++++++--- crates/stages/src/pipeline/sync_metrics.rs | 29 +++++++++++++--------- 2 files changed, 32 insertions(+), 15 deletions(-) diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index ab2f9adb85..aaaa7323f3 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -139,6 +139,7 @@ where .view(|tx| stage_id.get_checkpoint(tx).ok().flatten().unwrap_or_default()) .ok() .unwrap_or_default(), + None, ); } } @@ -265,7 +266,13 @@ where match output { Ok(unwind_output) => { stage_progress = unwind_output.checkpoint; - self.metrics.stage_checkpoint(stage_id, stage_progress); + self.metrics.stage_checkpoint( + stage_id, + stage_progress, + // We assume it was set in the previous execute iteration, so it + // doesn't change when we unwind. + None, + ); stage_id.save_checkpoint(tx.deref(), stage_progress)?; self.listeners @@ -323,7 +330,8 @@ where .await { Ok(out @ ExecOutput { checkpoint, done }) => { - made_progress |= checkpoint != prev_checkpoint.unwrap_or_default(); + made_progress |= + checkpoint.block_number != prev_checkpoint.unwrap_or_default().block_number; info!( target: "sync::pipeline", stage = %stage_id, @@ -332,7 +340,11 @@ where %done, "Stage made progress" ); - self.metrics.stage_checkpoint(stage_id, checkpoint); + self.metrics.stage_checkpoint( + stage_id, + checkpoint, + previous_stage.map(|(_, checkpoint)| checkpoint.block_number), + ); stage_id.save_checkpoint(tx.deref(), checkpoint)?; self.listeners.notify(PipelineEvent::Ran { stage_id, result: out.clone() }); diff --git a/crates/stages/src/pipeline/sync_metrics.rs b/crates/stages/src/pipeline/sync_metrics.rs index 04fdfafa5d..d1e4794223 100644 --- a/crates/stages/src/pipeline/sync_metrics.rs +++ b/crates/stages/src/pipeline/sync_metrics.rs @@ -1,7 +1,7 @@ use crate::StageId; use metrics::Gauge; use reth_metrics_derive::Metrics; -use reth_primitives::{EntitiesCheckpoint, StageCheckpoint, StageUnitCheckpoint}; +use reth_primitives::{BlockNumber, EntitiesCheckpoint, StageCheckpoint, StageUnitCheckpoint}; use std::collections::HashMap; #[derive(Metrics)] @@ -17,28 +17,33 @@ pub(crate) struct StageMetrics { #[derive(Default)] pub(crate) struct Metrics { - checkpoints: HashMap, + stages: HashMap, } impl Metrics { - pub(crate) fn stage_checkpoint(&mut self, stage_id: StageId, checkpoint: StageCheckpoint) { + pub(crate) fn stage_checkpoint( + &mut self, + stage_id: StageId, + checkpoint: StageCheckpoint, + max_block_number: Option, + ) { let stage_metrics = self - .checkpoints + .stages .entry(stage_id) .or_insert_with(|| StageMetrics::new_with_labels(&[("stage", stage_id.to_string())])); stage_metrics.checkpoint.set(checkpoint.block_number as f64); - #[allow(clippy::single_match)] - match checkpoint.stage_checkpoint { + let (processed, total) = match checkpoint.stage_checkpoint { Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { processed, total })) => { - stage_metrics.entities_processed.set(processed as f64); - - if let Some(total) = total { - stage_metrics.entities_total.set(total as f64); - } + (processed, total) } - _ => (), + _ => (checkpoint.block_number, max_block_number), + }; + + stage_metrics.entities_processed.set(processed as f64); + if let Some(total) = total { + stage_metrics.entities_total.set(total as f64); } } }