feat(node-core, stages): stage preparation notification (#7156)

This commit is contained in:
Alexey Shekhirin
2024-03-15 20:02:50 +00:00
committed by GitHub
parent 4e1c56f8d0
commit 4c3ab9a955
4 changed files with 114 additions and 3 deletions

View File

@@ -71,6 +71,30 @@ impl<DB> NodeState<DB> {
/// Processes an event emitted by the pipeline
fn handle_pipeline_event(&mut self, event: PipelineEvent) {
match event {
PipelineEvent::Prepare { pipeline_stages_progress, stage_id, checkpoint, target } => {
let checkpoint = checkpoint.unwrap_or_default();
let current_stage = CurrentStage {
stage_id,
eta: match &self.current_stage {
Some(current_stage) if current_stage.stage_id == stage_id => {
current_stage.eta
}
_ => Eta::default(),
},
checkpoint,
target,
};
info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
checkpoint = %checkpoint.block_number,
target = %OptionalField(target),
"Preparing stage",
);
self.current_stage = Some(current_stage);
}
PipelineEvent::Run { pipeline_stages_progress, stage_id, checkpoint, target } => {
let checkpoint = checkpoint.unwrap_or_default();
let current_stage = CurrentStage {

View File

@@ -14,6 +14,17 @@ use std::fmt::{Display, Formatter};
/// - The pipeline will loop indefinitely unless a target block is set
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum PipelineEvent {
/// Emitted when a stage is about to be prepared for a run.
Prepare {
/// Pipeline stages progress.
pipeline_stages_progress: PipelineStagesProgress,
/// The stage that is about to be run.
stage_id: StageId,
/// The previous checkpoint of the stage.
checkpoint: Option<StageCheckpoint>,
/// The block number up to which the stage is running, if known.
target: Option<BlockNumber>,
},
/// Emitted when a stage is about to be run.
Run {
/// Pipeline stages progress.

View File

@@ -371,6 +371,16 @@ where
let exec_input = ExecInput { target, checkpoint: prev_checkpoint };
self.listeners.notify(PipelineEvent::Prepare {
pipeline_stages_progress: event::PipelineStagesProgress {
current: stage_index + 1,
total: total_stages,
},
stage_id,
checkpoint: prev_checkpoint,
target,
});
if let Err(err) = stage.execute_ready(exec_input).await {
self.listeners.notify(PipelineEvent::Error { stage_id });
match on_stage_error(&self.provider_factory, stage_id, prev_checkpoint, err)? {
@@ -611,6 +621,12 @@ mod tests {
assert_eq!(
events.collect::<Vec<PipelineEvent>>().await,
vec![
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
@@ -622,6 +638,12 @@ mod tests {
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
},
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
@@ -683,6 +705,12 @@ mod tests {
events.collect::<Vec<PipelineEvent>>().await,
vec![
// Executing
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
stage_id: StageId::Other("A"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
stage_id: StageId::Other("A"),
@@ -694,6 +722,12 @@ mod tests {
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
},
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
stage_id: StageId::Other("B"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
stage_id: StageId::Other("B"),
@@ -705,6 +739,12 @@ mod tests {
stage_id: StageId::Other("B"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
stage_id: StageId::Other("C"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
stage_id: StageId::Other("C"),
@@ -797,6 +837,12 @@ mod tests {
events.collect::<Vec<PipelineEvent>>().await,
vec![
// Executing
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
@@ -808,6 +854,12 @@ mod tests {
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
},
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
@@ -896,6 +948,12 @@ mod tests {
assert_eq!(
events.collect::<Vec<PipelineEvent>>().await,
vec![
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
@@ -907,6 +965,12 @@ mod tests {
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
@@ -926,6 +990,12 @@ mod tests {
stage_id: StageId::Other("A"),
result: UnwindOutput { checkpoint: StageCheckpoint::new(0) },
},
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
checkpoint: Some(StageCheckpoint::new(0)),
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
@@ -937,6 +1007,12 @@ mod tests {
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),

View File

@@ -120,7 +120,7 @@ where
for (index, header) in self.header_collector.iter()?.enumerate() {
let (_, header_buf) = header?;
if index > 0 && index % interval == 0 {
if index > 0 && index % interval == 0 && total_headers > 100 {
info!(target: "sync::stages::headers", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers");
}
@@ -146,7 +146,7 @@ where
writer.append_header(header, td, header_hash)?;
}
info!(target: "sync::stages::headers", total = total_headers, "Writing header hash index");
info!(target: "sync::stages::headers", total = total_headers, "Writing headers hash index");
let mut cursor_header_numbers = tx.cursor_write::<RawTable<tables::HeaderNumbers>>()?;
let mut first_sync = false;
@@ -166,7 +166,7 @@ where
for (index, hash_to_number) in self.hash_collector.iter()?.enumerate() {
let (hash, number) = hash_to_number?;
if index > 0 && index % interval == 0 {
if index > 0 && index % interval == 0 && total_headers > 100 {
info!(target: "sync::stages::headers", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers hash index");
}