test(sync): pipeline intermediate unwind (#1970)

This commit is contained in:
Roman Krasiuk
2023-03-24 16:26:19 +02:00
committed by GitHub
parent d0a70e179b
commit 934b78b496

View File

@@ -578,6 +578,64 @@ mod tests {
);
}
/// Unwinds a pipeline with intermediate progress.
#[tokio::test]
async fn unwind_pipeline_with_intermediate_progress() {
let db = test_utils::create_test_db::<mdbx::WriteMap>(EnvKind::RW);
let mut pipeline: Pipeline<_, NoopSyncStateUpdate> = Pipeline::builder()
.add_stage(
TestStage::new(StageId("A"))
.add_exec(Ok(ExecOutput { stage_progress: 100, done: true }))
.add_unwind(Ok(UnwindOutput { stage_progress: 50 })),
)
.add_stage(
TestStage::new(StageId("B"))
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true })),
)
.with_max_block(10)
.build();
let events = pipeline.events();
// Run pipeline
tokio::spawn(async move {
// Sync first
pipeline.run(db.clone()).await.expect("Could not run pipeline");
// Unwind
pipeline.unwind(&db, 50, None).await.expect("Could not unwind pipeline");
});
// Check that the stages were unwound in reverse order
assert_eq!(
events.collect::<Vec<PipelineEvent>>().await,
vec![
// Executing
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None },
PipelineEvent::Ran {
stage_id: StageId("A"),
result: ExecOutput { stage_progress: 100, done: true },
},
PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None },
PipelineEvent::Ran {
stage_id: StageId("B"),
result: ExecOutput { stage_progress: 10, done: true },
},
// Unwinding
// Nothing to unwind in stage "B"
PipelineEvent::Skipped { stage_id: StageId("B") },
PipelineEvent::Unwinding {
stage_id: StageId("A"),
input: UnwindInput { stage_progress: 100, unwind_to: 50, bad_block: None }
},
PipelineEvent::Unwound {
stage_id: StageId("A"),
result: UnwindOutput { stage_progress: 50 },
},
]
);
}
/// Runs a pipeline that unwinds during sync.
///
/// The flow is: