From 82cc96d0c03db9c1fd166e472dc846a05a122c99 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 2 Oct 2024 13:24:05 +0300 Subject: [PATCH] fix(stages): call post execute/unwind commit hooks (#11413) --- crates/stages/api/src/pipeline/mod.rs | 75 +++++++++++++++++++-------- crates/stages/api/src/test_utils.rs | 44 ++++++++++++++-- 2 files changed, 93 insertions(+), 26 deletions(-) diff --git a/crates/stages/api/src/pipeline/mod.rs b/crates/stages/api/src/pipeline/mod.rs index 928c43fb62..19b68b3848 100644 --- a/crates/stages/api/src/pipeline/mod.rs +++ b/crates/stages/api/src/pipeline/mod.rs @@ -463,6 +463,8 @@ impl Pipeline { self.provider_factory.static_file_provider(), )?; + stage.post_execute_commit()?; + if done { let block_number = checkpoint.block_number; return Ok(if made_progress { @@ -586,6 +588,8 @@ impl std::fmt::Debug for Pipeline { #[cfg(test)] mod tests { + use std::sync::atomic::Ordering; + use super::*; use crate::{test_utils::TestStage, UnwindOutput}; use assert_matches::assert_matches; @@ -628,15 +632,19 @@ mod tests { async fn run_pipeline() { let provider_factory = create_test_provider_factory(); + let stage_a = TestStage::new(StageId::Other("A")) + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true })); + let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter(); + let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter(); + + let stage_b = TestStage::new(StageId::Other("B")) + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })); + let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter(); + let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter(); + let mut pipeline = Pipeline::::builder() - .add_stage( - TestStage::new(StageId::Other("A")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true })), - ) - .add_stage( - TestStage::new(StageId::Other("B")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), - ) + .add_stage(stage_a) + .add_stage(stage_b) .with_max_block(10) .build( provider_factory.clone(), @@ -689,6 +697,12 @@ mod tests { }, ] ); + + assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1); + assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 0); + + assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1); + assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 0); } /// Unwinds a simple pipeline. @@ -696,22 +710,28 @@ mod tests { async fn unwind_pipeline() { let provider_factory = create_test_provider_factory(); + let stage_a = TestStage::new(StageId::Other("A")) + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true })) + .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })); + let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter(); + let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter(); + + let stage_b = TestStage::new(StageId::Other("B")) + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })) + .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })); + let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter(); + let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter(); + + let stage_c = TestStage::new(StageId::Other("C")) + .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true })) + .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })); + let (stage_c, post_execute_commit_counter_c) = stage_c.with_post_execute_commit_counter(); + let (stage_c, post_unwind_commit_counter_c) = stage_c.with_post_unwind_commit_counter(); + let mut pipeline = Pipeline::::builder() - .add_stage( - TestStage::new(StageId::Other("A")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true })) - .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })), - ) - .add_stage( - TestStage::new(StageId::Other("B")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })) - .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })), - ) - .add_stage( - TestStage::new(StageId::Other("C")) - .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true })) - .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })), - ) + .add_stage(stage_a) + .add_stage(stage_b) + .add_stage(stage_c) .with_max_block(10) .build( provider_factory.clone(), @@ -823,6 +843,15 @@ mod tests { }, ] ); + + assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1); + assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 1); + + assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1); + assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 1); + + assert_eq!(post_execute_commit_counter_c.load(Ordering::Relaxed), 1); + assert_eq!(post_unwind_commit_counter_c.load(Ordering::Relaxed), 1); } /// Unwinds a pipeline with intermediate progress. diff --git a/crates/stages/api/src/test_utils.rs b/crates/stages/api/src/test_utils.rs index 3cd2f4bc40..1f15e55140 100644 --- a/crates/stages/api/src/test_utils.rs +++ b/crates/stages/api/src/test_utils.rs @@ -1,7 +1,13 @@ #![allow(missing_docs)] use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; -use std::collections::VecDeque; +use std::{ + collections::VecDeque, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; /// A test stage that can be used for testing. /// @@ -11,11 +17,19 @@ pub struct TestStage { id: StageId, exec_outputs: VecDeque>, unwind_outputs: VecDeque>, + post_execute_commit_counter: Arc, + post_unwind_commit_counter: Arc, } impl TestStage { - pub const fn new(id: StageId) -> Self { - Self { id, exec_outputs: VecDeque::new(), unwind_outputs: VecDeque::new() } + pub fn new(id: StageId) -> Self { + Self { + id, + exec_outputs: VecDeque::new(), + unwind_outputs: VecDeque::new(), + post_execute_commit_counter: Arc::new(AtomicUsize::new(0)), + post_unwind_commit_counter: Arc::new(AtomicUsize::new(0)), + } } pub fn with_exec(mut self, exec_outputs: VecDeque>) -> Self { @@ -40,6 +54,18 @@ impl TestStage { self.unwind_outputs.push_back(output); self } + + pub fn with_post_execute_commit_counter(mut self) -> (Self, Arc) { + let counter = Arc::new(AtomicUsize::new(0)); + self.post_execute_commit_counter = counter.clone(); + (self, counter) + } + + pub fn with_post_unwind_commit_counter(mut self) -> (Self, Arc) { + let counter = Arc::new(AtomicUsize::new(0)); + self.post_unwind_commit_counter = counter.clone(); + (self, counter) + } } impl Stage for TestStage { @@ -53,9 +79,21 @@ impl Stage for TestStage { .unwrap_or_else(|| panic!("Test stage {} executed too many times.", self.id)) } + fn post_execute_commit(&mut self) -> Result<(), StageError> { + self.post_execute_commit_counter.fetch_add(1, Ordering::Relaxed); + + Ok(()) + } + fn unwind(&mut self, _: &Provider, _input: UnwindInput) -> Result { self.unwind_outputs .pop_front() .unwrap_or_else(|| panic!("Test stage {} unwound too many times.", self.id)) } + + fn post_unwind_commit(&mut self) -> Result<(), StageError> { + self.post_unwind_commit_counter.fetch_add(1, Ordering::Relaxed); + + Ok(()) + } }