From 274ab547038829103533b86be3f411fd3e8edc5c Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Sun, 19 Mar 2023 16:37:11 +0200 Subject: [PATCH] chore(pipeline): extract helper types to separate files (#1842) --- crates/stages/src/pipeline/mod.rs | 66 ++-------------------- crates/stages/src/pipeline/progress.rs | 39 +++++++++++++ crates/stages/src/pipeline/sync_metrics.rs | 26 +++++++++ 3 files changed, 70 insertions(+), 61 deletions(-) create mode 100644 crates/stages/src/pipeline/progress.rs create mode 100644 crates/stages/src/pipeline/sync_metrics.rs diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index 65521a0b98..4e87fe6f67 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -1,12 +1,9 @@ -use crate::{error::*, util::opt, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput}; -use metrics::Gauge; +use crate::{error::*, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput}; use reth_db::database::Database; use reth_interfaces::sync::{SyncState, SyncStateUpdater}; -use reth_metrics_derive::Metrics; use reth_primitives::{BlockNumber, H256}; use reth_provider::Transaction; use std::{ - collections::HashMap, fmt::{Debug, Formatter}, ops::Deref, sync::Arc, @@ -18,12 +15,16 @@ use tracing::*; mod builder; mod ctrl; mod event; +mod progress; mod set; +mod sync_metrics; pub use builder::*; use ctrl::*; pub use event::*; +use progress::*; pub use set::*; +use sync_metrics::*; #[cfg_attr(doc, aquamarine::aquamarine)] /// A staged sync pipeline. @@ -368,63 +369,6 @@ impl Pipeline { } } -#[derive(Metrics)] -#[metrics(scope = "sync")] -struct StageMetrics { - /// The block number of the last commit for a stage. - checkpoint: Gauge, -} - -#[derive(Default)] -struct Metrics { - checkpoints: HashMap, -} - -impl Metrics { - fn stage_checkpoint(&mut self, stage_id: StageId, progress: u64) { - self.checkpoints - .entry(stage_id) - .or_insert_with(|| StageMetrics::new_with_labels(&[("stage", stage_id.to_string())])) - .checkpoint - .set(progress as f64); - } -} - -#[derive(Debug, Default)] -struct PipelineProgress { - /// The progress of the current stage - pub(crate) progress: Option, - /// The maximum progress achieved by any stage during the execution of the pipeline. - pub(crate) maximum_progress: Option, - /// The minimum progress achieved by any stage during the execution of the pipeline. - pub(crate) minimum_progress: Option, -} - -impl PipelineProgress { - fn update(&mut self, progress: BlockNumber) { - self.progress = Some(progress); - self.minimum_progress = opt::min(self.minimum_progress, progress); - self.maximum_progress = opt::max(self.maximum_progress, progress); - } - - /// Create a sync state from pipeline progress. - fn current_sync_state(&self, downloading: bool) -> SyncState { - match self.progress { - Some(progress) if downloading => SyncState::Downloading { target_block: progress }, - Some(progress) => SyncState::Executing { target_block: progress }, - None => SyncState::Idle, - } - } - - /// Get next control flow step - fn next_ctrl(&self) -> ControlFlow { - match self.progress { - Some(progress) => ControlFlow::Continue { progress }, - None => ControlFlow::NoProgress { stage_progress: None }, - } - } -} - /// A container for a queued stage. pub(crate) type BoxedStage = Box>; diff --git a/crates/stages/src/pipeline/progress.rs b/crates/stages/src/pipeline/progress.rs new file mode 100644 index 0000000000..51711370b6 --- /dev/null +++ b/crates/stages/src/pipeline/progress.rs @@ -0,0 +1,39 @@ +use super::ctrl::ControlFlow; +use crate::util::opt; +use reth_interfaces::sync::SyncState; +use reth_primitives::BlockNumber; + +#[derive(Debug, Default)] +pub(crate) struct PipelineProgress { + /// The progress of the current stage + pub(crate) progress: Option, + /// The maximum progress achieved by any stage during the execution of the pipeline. + pub(crate) maximum_progress: Option, + /// The minimum progress achieved by any stage during the execution of the pipeline. + pub(crate) minimum_progress: Option, +} + +impl PipelineProgress { + pub(crate) fn update(&mut self, progress: BlockNumber) { + self.progress = Some(progress); + self.minimum_progress = opt::min(self.minimum_progress, progress); + self.maximum_progress = opt::max(self.maximum_progress, progress); + } + + /// Create a sync state from pipeline progress. + pub(crate) fn current_sync_state(&self, downloading: bool) -> SyncState { + match self.progress { + Some(progress) if downloading => SyncState::Downloading { target_block: progress }, + Some(progress) => SyncState::Executing { target_block: progress }, + None => SyncState::Idle, + } + } + + /// Get next control flow step + pub(crate) fn next_ctrl(&self) -> ControlFlow { + match self.progress { + Some(progress) => ControlFlow::Continue { progress }, + None => ControlFlow::NoProgress { stage_progress: None }, + } + } +} diff --git a/crates/stages/src/pipeline/sync_metrics.rs b/crates/stages/src/pipeline/sync_metrics.rs new file mode 100644 index 0000000000..ba44ebc726 --- /dev/null +++ b/crates/stages/src/pipeline/sync_metrics.rs @@ -0,0 +1,26 @@ +use crate::StageId; +use metrics::Gauge; +use reth_metrics_derive::Metrics; +use std::collections::HashMap; + +#[derive(Metrics)] +#[metrics(scope = "sync")] +pub(crate) struct StageMetrics { + /// The block number of the last commit for a stage. + checkpoint: Gauge, +} + +#[derive(Default)] +pub(crate) struct Metrics { + checkpoints: HashMap, +} + +impl Metrics { + pub(crate) fn stage_checkpoint(&mut self, stage_id: StageId, progress: u64) { + self.checkpoints + .entry(stage_id) + .or_insert_with(|| StageMetrics::new_with_labels(&[("stage", stage_id.to_string())])) + .checkpoint + .set(progress as f64); + } +}