From 9a0aeea543761caa84eec442450f71e314a0f16f Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 1 Jun 2023 02:17:33 +0400 Subject: [PATCH] feat(bin, pipeline): report pipeline progress (#2932) --- bin/reth/src/node/events.rs | 11 ++- crates/stages/src/pipeline/event.rs | 8 +++ crates/stages/src/pipeline/mod.rs | 108 ++++++++++++++++++++++++---- 3 files changed, 113 insertions(+), 14 deletions(-) diff --git a/bin/reth/src/node/events.rs b/bin/reth/src/node/events.rs index 005b49a660..2dca909b46 100644 --- a/bin/reth/src/node/events.rs +++ b/bin/reth/src/node/events.rs @@ -37,7 +37,7 @@ impl NodeState { /// Processes an event emitted by the pipeline fn handle_pipeline_event(&mut self, event: PipelineEvent) { match event { - PipelineEvent::Running { stage_id, checkpoint } => { + PipelineEvent::Running { pipeline_position, pipeline_total, stage_id, checkpoint } => { let notable = self.current_stage.is_none(); self.current_stage = Some(stage_id); self.current_checkpoint = checkpoint.unwrap_or_default(); @@ -45,6 +45,7 @@ impl NodeState { if notable { info!( target: "reth::cli", + pipeline_stages = %format!("{pipeline_position}/{pipeline_total}"), stage = %stage_id, from = self.current_checkpoint.block_number, checkpoint = %self.current_checkpoint, @@ -52,7 +53,12 @@ impl NodeState { ); } } - PipelineEvent::Ran { stage_id, result: ExecOutput { checkpoint, done } } => { + PipelineEvent::Ran { + pipeline_position, + pipeline_total, + stage_id, + result: ExecOutput { checkpoint, done }, + } => { self.current_checkpoint = checkpoint; if done { @@ -61,6 +67,7 @@ impl NodeState { info!( target: "reth::cli", + pipeline_stages = %format!("{pipeline_position}/{pipeline_total}"), stage = %stage_id, progress = checkpoint.block_number, %checkpoint, diff --git a/crates/stages/src/pipeline/event.rs b/crates/stages/src/pipeline/event.rs index 4beb723605..2230c4075e 100644 --- a/crates/stages/src/pipeline/event.rs +++ b/crates/stages/src/pipeline/event.rs @@ -12,6 +12,10 @@ use reth_primitives::stage::{StageCheckpoint, StageId}; pub enum PipelineEvent { /// Emitted when a stage is about to be run. Running { + /// 1-indexed ID of the stage that is about to be run out of total stages in the pipeline. + pipeline_position: usize, + /// Total number of stages in the pipeline. + pipeline_total: usize, /// The stage that is about to be run. stage_id: StageId, /// The previous checkpoint of the stage. @@ -19,6 +23,10 @@ pub enum PipelineEvent { }, /// Emitted when a stage has run a single time. Ran { + /// 1-indexed ID of the stage that was run out of total stages in the pipeline. + pipeline_position: usize, + /// Total number of stages in the pipeline. + pipeline_total: usize, /// The stage that was run. stage_id: StageId, /// The result of executing the stage. diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index 3cfa18da6c..8e77270286 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -299,6 +299,8 @@ where previous_stage: Option<(StageId, StageCheckpoint)>, stage_index: usize, ) -> Result { + let total_stages = self.stages.len(); + let stage = &mut self.stages[stage_index]; let stage_id = stage.id(); let mut made_progress = false; @@ -326,7 +328,12 @@ where }) } - self.listeners.notify(PipelineEvent::Running { stage_id, checkpoint: prev_checkpoint }); + self.listeners.notify(PipelineEvent::Running { + pipeline_position: stage_index + 1, + pipeline_total: total_stages, + stage_id, + checkpoint: prev_checkpoint, + }); match stage .execute(&mut tx, ExecInput { previous_stage, checkpoint: prev_checkpoint }) @@ -350,7 +357,12 @@ where ); tx.save_stage_checkpoint(stage_id, checkpoint)?; - self.listeners.notify(PipelineEvent::Ran { stage_id, result: out.clone() }); + self.listeners.notify(PipelineEvent::Ran { + pipeline_position: stage_index + 1, + pipeline_total: total_stages, + stage_id, + result: out.clone(), + }); // TODO: Make the commit interval configurable tx.commit()?; @@ -481,13 +493,27 @@ mod tests { assert_eq!( events.collect::>().await, vec![ - PipelineEvent::Running { stage_id: StageId::Other("A"), checkpoint: None }, + PipelineEvent::Running { + pipeline_position: 1, + pipeline_total: 2, + stage_id: StageId::Other("A"), + checkpoint: None + }, PipelineEvent::Ran { + pipeline_position: 1, + pipeline_total: 2, stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }, }, - PipelineEvent::Running { stage_id: StageId::Other("B"), checkpoint: None }, + PipelineEvent::Running { + pipeline_position: 2, + pipeline_total: 2, + stage_id: StageId::Other("B"), + checkpoint: None + }, PipelineEvent::Ran { + pipeline_position: 2, + pipeline_total: 2, stage_id: StageId::Other("B"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, @@ -534,18 +560,39 @@ mod tests { events.collect::>().await, vec![ // Executing - PipelineEvent::Running { stage_id: StageId::Other("A"), checkpoint: None }, + PipelineEvent::Running { + pipeline_position: 1, + pipeline_total: 3, + stage_id: StageId::Other("A"), + checkpoint: None + }, PipelineEvent::Ran { + pipeline_position: 1, + pipeline_total: 3, stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }, }, - PipelineEvent::Running { stage_id: StageId::Other("B"), checkpoint: None }, + PipelineEvent::Running { + pipeline_position: 2, + pipeline_total: 3, + stage_id: StageId::Other("B"), + checkpoint: None + }, PipelineEvent::Ran { + pipeline_position: 2, + pipeline_total: 3, stage_id: StageId::Other("B"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, - PipelineEvent::Running { stage_id: StageId::Other("C"), checkpoint: None }, + PipelineEvent::Running { + pipeline_position: 3, + pipeline_total: 3, + stage_id: StageId::Other("C"), + checkpoint: None + }, PipelineEvent::Ran { + pipeline_position: 3, + pipeline_total: 3, stage_id: StageId::Other("C"), result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }, }, @@ -623,13 +670,27 @@ mod tests { events.collect::>().await, vec![ // Executing - PipelineEvent::Running { stage_id: StageId::Other("A"), checkpoint: None }, + PipelineEvent::Running { + pipeline_position: 1, + pipeline_total: 2, + stage_id: StageId::Other("A"), + checkpoint: None + }, PipelineEvent::Ran { + pipeline_position: 1, + pipeline_total: 2, stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }, }, - PipelineEvent::Running { stage_id: StageId::Other("B"), checkpoint: None }, + PipelineEvent::Running { + pipeline_position: 2, + pipeline_total: 2, + stage_id: StageId::Other("B"), + checkpoint: None + }, PipelineEvent::Ran { + pipeline_position: 2, + pipeline_total: 2, stage_id: StageId::Other("B"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, @@ -697,12 +758,24 @@ mod tests { assert_eq!( events.collect::>().await, vec![ - PipelineEvent::Running { stage_id: StageId::Other("A"), checkpoint: None }, + PipelineEvent::Running { + pipeline_position: 1, + pipeline_total: 2, + stage_id: StageId::Other("A"), + checkpoint: None + }, PipelineEvent::Ran { + pipeline_position: 1, + pipeline_total: 2, stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, - PipelineEvent::Running { stage_id: StageId::Other("B"), checkpoint: None }, + PipelineEvent::Running { + pipeline_position: 2, + pipeline_total: 2, + stage_id: StageId::Other("B"), + checkpoint: None + }, PipelineEvent::Error { stage_id: StageId::Other("B") }, PipelineEvent::Unwinding { stage_id: StageId::Other("A"), @@ -717,15 +790,26 @@ mod tests { result: UnwindOutput { checkpoint: StageCheckpoint::new(0) }, }, PipelineEvent::Running { + pipeline_position: 1, + pipeline_total: 2, stage_id: StageId::Other("A"), checkpoint: Some(StageCheckpoint::new(0)) }, PipelineEvent::Ran { + pipeline_position: 1, + pipeline_total: 2, stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, - PipelineEvent::Running { stage_id: StageId::Other("B"), checkpoint: None }, + PipelineEvent::Running { + pipeline_position: 2, + pipeline_total: 2, + stage_id: StageId::Other("B"), + checkpoint: None + }, PipelineEvent::Ran { + pipeline_position: 2, + pipeline_total: 2, stage_id: StageId::Other("B"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, },