diff --git a/bin/reth/src/node/events.rs b/bin/reth/src/node/events.rs index 21e2cd76c0..8b5d7c76ad 100644 --- a/bin/reth/src/node/events.rs +++ b/bin/reth/src/node/events.rs @@ -56,25 +56,36 @@ impl NodeState { /// Processes an event emitted by the pipeline fn handle_pipeline_event(&mut self, event: PipelineEvent) { match event { - PipelineEvent::Running { pipeline_position, pipeline_total, stage_id, checkpoint } => { + PipelineEvent::Running { pipeline_stages_progress, stage_id, checkpoint } => { let notable = self.current_stage.is_none(); self.current_stage = Some(stage_id); self.current_checkpoint = checkpoint.unwrap_or_default(); if notable { - info!( - pipeline_stages = %format!("{pipeline_position}/{pipeline_total}"), - stage = %stage_id, - from = self.current_checkpoint.block_number, - checkpoint = %self.current_checkpoint, - eta = %self.eta.fmt_for_stage(stage_id), - "Executing stage", - ); + if let Some(progress) = self.current_checkpoint.entities() { + info!( + pipeline_stages = %pipeline_stages_progress, + stage = %stage_id, + from = self.current_checkpoint.block_number, + checkpoint = %self.current_checkpoint.block_number, + %progress, + eta = %self.eta.fmt_for_stage(stage_id), + "Executing stage", + ); + } else { + info!( + pipeline_stages = %pipeline_stages_progress, + stage = %stage_id, + from = self.current_checkpoint.block_number, + checkpoint = %self.current_checkpoint.block_number, + eta = %self.eta.fmt_for_stage(stage_id), + "Executing stage", + ); + } } } PipelineEvent::Ran { - pipeline_position, - pipeline_total, + pipeline_stages_progress, stage_id, result: ExecOutput { checkpoint, done }, } => { @@ -84,19 +95,27 @@ impl NodeState { } self.eta.update(self.current_checkpoint); - info!( - pipeline_stages = %format!("{pipeline_position}/{pipeline_total}"), - stage = %stage_id, - block = checkpoint.block_number, - %checkpoint, - eta = %self.eta.fmt_for_stage(stage_id), - "{}", - if done { - "Stage finished executing" - } else { - "Stage committed progress" - } - ); + let message = + if done { "Stage finished executing" } else { "Stage committed progress" }; + + if let Some(progress) = checkpoint.entities() { + info!( + pipeline_stages = %pipeline_stages_progress, + stage = %stage_id, + checkpoint = %checkpoint.block_number, + %progress, + eta = %self.eta.fmt_for_stage(stage_id), + "{message}", + ); + } else { + info!( + pipeline_stages = %pipeline_stages_progress, + stage = %stage_id, + checkpoint = %checkpoint.block_number, + eta = %self.eta.fmt_for_stage(stage_id), + "{message}", + ); + } if done { self.current_stage = None; @@ -254,15 +273,27 @@ where let mut this = self.project(); while this.info_interval.poll_tick(cx).is_ready() { - if let Some(stage_id) = this.state.current_stage { - info!( - target: "reth::cli", - connected_peers = this.state.num_connected_peers(), - stage = %stage_id.to_string(), - checkpoint = %this.state.current_checkpoint, - eta = %this.state.eta.fmt_for_stage(stage_id), - "Status" - ); + if let Some(stage) = this.state.current_stage { + if let Some(progress) = this.state.current_checkpoint.entities() { + info!( + target: "reth::cli", + connected_peers = this.state.num_connected_peers(), + %stage, + checkpoint = %this.state.current_checkpoint.block_number, + %progress, + eta = %this.state.eta.fmt_for_stage(stage), + "Status" + ); + } else { + info!( + target: "reth::cli", + connected_peers = this.state.num_connected_peers(), + %stage, + checkpoint = %this.state.current_checkpoint.block_number, + eta = %this.state.eta.fmt_for_stage(stage), + "Status" + ); + } } else { info!( target: "reth::cli", diff --git a/crates/primitives/src/stage/checkpoints.rs b/crates/primitives/src/stage/checkpoints.rs index 8225eddd69..c03b5c6ca8 100644 --- a/crates/primitives/src/stage/checkpoints.rs +++ b/crates/primitives/src/stage/checkpoints.rs @@ -246,15 +246,6 @@ impl StageCheckpoint { } } -impl Display for StageCheckpoint { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self.entities() { - Some(entities) => entities.fmt(f), - None => write!(f, "{}", self.block_number), - } - } -} - // TODO(alexey): add a merkle checkpoint. Currently it's hard because [`MerkleCheckpoint`] // is not a Copy type. /// Stage-specific checkpoint metrics. diff --git a/crates/stages/src/pipeline/event.rs b/crates/stages/src/pipeline/event.rs index 2230c4075e..05d7945d33 100644 --- a/crates/stages/src/pipeline/event.rs +++ b/crates/stages/src/pipeline/event.rs @@ -1,5 +1,6 @@ use crate::stage::{ExecOutput, UnwindInput, UnwindOutput}; use reth_primitives::stage::{StageCheckpoint, StageId}; +use std::fmt::{Display, Formatter}; /// An event emitted by a [Pipeline][crate::Pipeline]. /// @@ -12,10 +13,8 @@ use reth_primitives::stage::{StageCheckpoint, StageId}; pub enum PipelineEvent { /// Emitted when a stage is about to be run. Running { - /// 1-indexed ID of the stage that is about to be run out of total stages in the pipeline. - pipeline_position: usize, - /// Total number of stages in the pipeline. - pipeline_total: usize, + /// Pipeline stages progress. + pipeline_stages_progress: PipelineStagesProgress, /// The stage that is about to be run. stage_id: StageId, /// The previous checkpoint of the stage. @@ -23,10 +22,8 @@ pub enum PipelineEvent { }, /// Emitted when a stage has run a single time. Ran { - /// 1-indexed ID of the stage that was run out of total stages in the pipeline. - pipeline_position: usize, - /// Total number of stages in the pipeline. - pipeline_total: usize, + /// Pipeline stages progress. + pipeline_stages_progress: PipelineStagesProgress, /// The stage that was run. stage_id: StageId, /// The result of executing the stage. @@ -61,3 +58,18 @@ pub enum PipelineEvent { stage_id: StageId, }, } + +/// Pipeline stages progress. +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct PipelineStagesProgress { + /// 1-indexed ID of the stage that is about to be run out of total stages in the pipeline. + pub current: usize, + /// Total number of stages in the pipeline. + pub total: usize, +} + +impl Display for PipelineStagesProgress { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}/{}", self.current, self.total) + } +} diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index 2ff0b029ec..09e2b1bbfb 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -272,12 +272,23 @@ where let mut checkpoint = provider_rw.get_stage_checkpoint(stage_id)?.unwrap_or_default(); if checkpoint.block_number < to { - debug!(target: "sync::pipeline", from = %checkpoint, %to, "Unwind point too far for stage"); + debug!( + target: "sync::pipeline", + from = %checkpoint.block_number, + %to, + "Unwind point too far for stage" + ); self.listeners.notify(PipelineEvent::Skipped { stage_id }); continue } - debug!(target: "sync::pipeline", from = %checkpoint, %to, ?bad_block, "Starting unwind"); + debug!( + target: "sync::pipeline", + from = %checkpoint.block_number, + %to, + ?bad_block, + "Starting unwind" + ); while checkpoint.block_number > to { let input = UnwindInput { checkpoint, unwind_to: to, bad_block }; self.listeners.notify(PipelineEvent::Unwinding { stage_id, input }); @@ -360,8 +371,10 @@ where } self.listeners.notify(PipelineEvent::Running { - pipeline_position: stage_index + 1, - pipeline_total: total_stages, + pipeline_stages_progress: event::PipelineStagesProgress { + current: stage_index + 1, + total: total_stages, + }, stage_id, checkpoint: prev_checkpoint, }); @@ -373,14 +386,25 @@ where Ok(out @ ExecOutput { checkpoint, done }) => { made_progress |= checkpoint.block_number != prev_checkpoint.unwrap_or_default().block_number; - debug!( - target: "sync::pipeline", - stage = %stage_id, - progress = checkpoint.block_number, - %checkpoint, - %done, - "Stage committed progress" - ); + + if let Some(progress) = checkpoint.entities() { + debug!( + target: "sync::pipeline", + stage = %stage_id, + checkpoint = checkpoint.block_number, + %progress, + %done, + "Stage committed progress" + ); + } else { + debug!( + target: "sync::pipeline", + stage = %stage_id, + checkpoint = checkpoint.block_number, + %done, + "Stage committed progress" + ); + } if let Some(metrics_tx) = &mut self.metrics_tx { let _ = metrics_tx.send(MetricEvent::StageCheckpoint { stage_id, @@ -391,8 +415,10 @@ where provider_rw.save_stage_checkpoint(stage_id, checkpoint)?; self.listeners.notify(PipelineEvent::Ran { - pipeline_position: stage_index + 1, - pipeline_total: total_stages, + pipeline_stages_progress: event::PipelineStagesProgress { + current: stage_index + 1, + total: total_stages, + }, stage_id, result: out.clone(), }); @@ -579,26 +605,22 @@ mod tests { events.collect::>().await, vec![ PipelineEvent::Running { - pipeline_position: 1, - pipeline_total: 2, + pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), checkpoint: None }, PipelineEvent::Ran { - pipeline_position: 1, - pipeline_total: 2, + pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }, }, PipelineEvent::Running { - pipeline_position: 2, - pipeline_total: 2, + pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, stage_id: StageId::Other("B"), checkpoint: None }, PipelineEvent::Ran { - pipeline_position: 2, - pipeline_total: 2, + pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, stage_id: StageId::Other("B"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, @@ -646,38 +668,32 @@ mod tests { vec![ // Executing PipelineEvent::Running { - pipeline_position: 1, - pipeline_total: 3, + pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 }, stage_id: StageId::Other("A"), checkpoint: None }, PipelineEvent::Ran { - pipeline_position: 1, - pipeline_total: 3, + pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 }, stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }, }, PipelineEvent::Running { - pipeline_position: 2, - pipeline_total: 3, + pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 }, stage_id: StageId::Other("B"), checkpoint: None }, PipelineEvent::Ran { - pipeline_position: 2, - pipeline_total: 3, + pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 }, stage_id: StageId::Other("B"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, PipelineEvent::Running { - pipeline_position: 3, - pipeline_total: 3, + pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 }, stage_id: StageId::Other("C"), checkpoint: None }, PipelineEvent::Ran { - pipeline_position: 3, - pipeline_total: 3, + pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 }, stage_id: StageId::Other("C"), result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }, }, @@ -756,26 +772,22 @@ mod tests { vec![ // Executing PipelineEvent::Running { - pipeline_position: 1, - pipeline_total: 2, + pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), checkpoint: None }, PipelineEvent::Ran { - pipeline_position: 1, - pipeline_total: 2, + pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }, }, PipelineEvent::Running { - pipeline_position: 2, - pipeline_total: 2, + pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, stage_id: StageId::Other("B"), checkpoint: None }, PipelineEvent::Ran { - pipeline_position: 2, - pipeline_total: 2, + pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, stage_id: StageId::Other("B"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, @@ -846,20 +858,17 @@ mod tests { events.collect::>().await, vec![ PipelineEvent::Running { - pipeline_position: 1, - pipeline_total: 2, + pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), checkpoint: None }, PipelineEvent::Ran { - pipeline_position: 1, - pipeline_total: 2, + pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, PipelineEvent::Running { - pipeline_position: 2, - pipeline_total: 2, + pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, stage_id: StageId::Other("B"), checkpoint: None }, @@ -877,26 +886,22 @@ mod tests { result: UnwindOutput { checkpoint: StageCheckpoint::new(0) }, }, PipelineEvent::Running { - pipeline_position: 1, - pipeline_total: 2, + pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), checkpoint: Some(StageCheckpoint::new(0)) }, PipelineEvent::Ran { - pipeline_position: 1, - pipeline_total: 2, + pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, PipelineEvent::Running { - pipeline_position: 2, - pipeline_total: 2, + pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, stage_id: StageId::Other("B"), checkpoint: None }, PipelineEvent::Ran { - pipeline_position: 2, - pipeline_total: 2, + pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, stage_id: StageId::Other("B"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index a8460412de..e57b736d61 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -206,7 +206,12 @@ where // Nothing to sync if gap.is_closed() { - info!(target: "sync::stages::headers", checkpoint = %current_checkpoint, target = ?tip, "Target block already reached"); + info!( + target: "sync::stages::headers", + checkpoint = %current_checkpoint.block_number, + target = ?tip, + "Target block already reached" + ); return Ok(ExecOutput::done(current_checkpoint)) }