diff --git a/crates/node-core/src/events/node.rs b/crates/node-core/src/events/node.rs index 863340cd18..bac061935b 100644 --- a/crates/node-core/src/events/node.rs +++ b/crates/node-core/src/events/node.rs @@ -71,6 +71,30 @@ impl NodeState { /// Processes an event emitted by the pipeline fn handle_pipeline_event(&mut self, event: PipelineEvent) { match event { + PipelineEvent::Prepare { pipeline_stages_progress, stage_id, checkpoint, target } => { + let checkpoint = checkpoint.unwrap_or_default(); + let current_stage = CurrentStage { + stage_id, + eta: match &self.current_stage { + Some(current_stage) if current_stage.stage_id == stage_id => { + current_stage.eta + } + _ => Eta::default(), + }, + checkpoint, + target, + }; + + info!( + pipeline_stages = %pipeline_stages_progress, + stage = %stage_id, + checkpoint = %checkpoint.block_number, + target = %OptionalField(target), + "Preparing stage", + ); + + self.current_stage = Some(current_stage); + } PipelineEvent::Run { pipeline_stages_progress, stage_id, checkpoint, target } => { let checkpoint = checkpoint.unwrap_or_default(); let current_stage = CurrentStage { diff --git a/crates/stages/src/pipeline/event.rs b/crates/stages/src/pipeline/event.rs index d5b02610a5..1dd761d3f4 100644 --- a/crates/stages/src/pipeline/event.rs +++ b/crates/stages/src/pipeline/event.rs @@ -14,6 +14,17 @@ use std::fmt::{Display, Formatter}; /// - The pipeline will loop indefinitely unless a target block is set #[derive(Debug, PartialEq, Eq, Clone)] pub enum PipelineEvent { + /// Emitted when a stage is about to be prepared for a run. + Prepare { + /// Pipeline stages progress. + pipeline_stages_progress: PipelineStagesProgress, + /// The stage that is about to be run. + stage_id: StageId, + /// The previous checkpoint of the stage. + checkpoint: Option, + /// The block number up to which the stage is running, if known. + target: Option, + }, /// Emitted when a stage is about to be run. Run { /// Pipeline stages progress. diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index 9fc78846bc..e4ad70fac8 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -371,6 +371,16 @@ where let exec_input = ExecInput { target, checkpoint: prev_checkpoint }; + self.listeners.notify(PipelineEvent::Prepare { + pipeline_stages_progress: event::PipelineStagesProgress { + current: stage_index + 1, + total: total_stages, + }, + stage_id, + checkpoint: prev_checkpoint, + target, + }); + if let Err(err) = stage.execute_ready(exec_input).await { self.listeners.notify(PipelineEvent::Error { stage_id }); match on_stage_error(&self.provider_factory, stage_id, prev_checkpoint, err)? { @@ -611,6 +621,12 @@ mod tests { assert_eq!( events.collect::>().await, vec![ + PipelineEvent::Prepare { + pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, + stage_id: StageId::Other("A"), + checkpoint: None, + target: Some(10), + }, PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), @@ -622,6 +638,12 @@ mod tests { stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }, }, + PipelineEvent::Prepare { + pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, + stage_id: StageId::Other("B"), + checkpoint: None, + target: Some(10), + }, PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, stage_id: StageId::Other("B"), @@ -683,6 +705,12 @@ mod tests { events.collect::>().await, vec![ // Executing + PipelineEvent::Prepare { + pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 }, + stage_id: StageId::Other("A"), + checkpoint: None, + target: Some(10), + }, PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 }, stage_id: StageId::Other("A"), @@ -694,6 +722,12 @@ mod tests { stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }, }, + PipelineEvent::Prepare { + pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 }, + stage_id: StageId::Other("B"), + checkpoint: None, + target: Some(10), + }, PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 }, stage_id: StageId::Other("B"), @@ -705,6 +739,12 @@ mod tests { stage_id: StageId::Other("B"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, + PipelineEvent::Prepare { + pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 }, + stage_id: StageId::Other("C"), + checkpoint: None, + target: Some(10), + }, PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 }, stage_id: StageId::Other("C"), @@ -797,6 +837,12 @@ mod tests { events.collect::>().await, vec![ // Executing + PipelineEvent::Prepare { + pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, + stage_id: StageId::Other("A"), + checkpoint: None, + target: Some(10), + }, PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), @@ -808,6 +854,12 @@ mod tests { stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }, }, + PipelineEvent::Prepare { + pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, + stage_id: StageId::Other("B"), + checkpoint: None, + target: Some(10), + }, PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, stage_id: StageId::Other("B"), @@ -896,6 +948,12 @@ mod tests { assert_eq!( events.collect::>().await, vec![ + PipelineEvent::Prepare { + pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, + stage_id: StageId::Other("A"), + checkpoint: None, + target: Some(10), + }, PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), @@ -907,6 +965,12 @@ mod tests { stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, + PipelineEvent::Prepare { + pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, + stage_id: StageId::Other("B"), + checkpoint: None, + target: Some(10), + }, PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, stage_id: StageId::Other("B"), @@ -926,6 +990,12 @@ mod tests { stage_id: StageId::Other("A"), result: UnwindOutput { checkpoint: StageCheckpoint::new(0) }, }, + PipelineEvent::Prepare { + pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, + stage_id: StageId::Other("A"), + checkpoint: Some(StageCheckpoint::new(0)), + target: Some(10), + }, PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), @@ -937,6 +1007,12 @@ mod tests { stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, + PipelineEvent::Prepare { + pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, + stage_id: StageId::Other("B"), + checkpoint: None, + target: Some(10), + }, PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, stage_id: StageId::Other("B"), diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index a1e03fb0f8..7e9e25fd67 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -120,7 +120,7 @@ where for (index, header) in self.header_collector.iter()?.enumerate() { let (_, header_buf) = header?; - if index > 0 && index % interval == 0 { + if index > 0 && index % interval == 0 && total_headers > 100 { info!(target: "sync::stages::headers", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers"); } @@ -146,7 +146,7 @@ where writer.append_header(header, td, header_hash)?; } - info!(target: "sync::stages::headers", total = total_headers, "Writing header hash index"); + info!(target: "sync::stages::headers", total = total_headers, "Writing headers hash index"); let mut cursor_header_numbers = tx.cursor_write::>()?; let mut first_sync = false; @@ -166,7 +166,7 @@ where for (index, hash_to_number) in self.hash_collector.iter()?.enumerate() { let (hash, number) = hash_to_number?; - if index > 0 && index % interval == 0 { + if index > 0 && index % interval == 0 && total_headers > 100 { info!(target: "sync::stages::headers", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers hash index"); }