diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index 4e87fe6f67..95347f982b 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -375,12 +375,11 @@ pub(crate) type BoxedStage = Box>; #[cfg(test)] mod tests { use super::*; - use crate::{StageId, UnwindOutput}; + use crate::{test_utils::TestStage, StageId, UnwindOutput}; use assert_matches::assert_matches; use reth_db::mdbx::{self, test_utils, EnvKind}; use reth_interfaces::{consensus, provider::ProviderError, sync::NoopSyncStateUpdate}; use tokio_stream::StreamExt; - use utils::TestStage; #[test] fn record_progress_calculates_outliers() { @@ -652,59 +651,4 @@ mod tests { }))) ); } - - mod utils { - use super::*; - use async_trait::async_trait; - use std::collections::VecDeque; - - pub(crate) struct TestStage { - id: StageId, - exec_outputs: VecDeque>, - unwind_outputs: VecDeque>, - } - - impl TestStage { - pub(crate) fn new(id: StageId) -> Self { - Self { id, exec_outputs: VecDeque::new(), unwind_outputs: VecDeque::new() } - } - - pub(crate) fn add_exec(mut self, output: Result) -> Self { - self.exec_outputs.push_back(output); - self - } - - pub(crate) fn add_unwind(mut self, output: Result) -> Self { - self.unwind_outputs.push_back(output); - self - } - } - - #[async_trait] - impl Stage for TestStage { - fn id(&self) -> StageId { - self.id - } - - async fn execute( - &mut self, - _: &mut Transaction<'_, DB>, - _input: ExecInput, - ) -> Result { - self.exec_outputs - .pop_front() - .unwrap_or_else(|| panic!("Test stage {} executed too many times.", self.id)) - } - - async fn unwind( - &mut self, - _: &mut Transaction<'_, DB>, - _input: UnwindInput, - ) -> Result { - self.unwind_outputs - .pop_front() - .unwrap_or_else(|| panic!("Test stage {} unwound too many times.", self.id)) - } - } - } } diff --git a/crates/stages/src/test_utils/mod.rs b/crates/stages/src/test_utils/mod.rs index 7ec5b2e840..3b499fc697 100644 --- a/crates/stages/src/test_utils/mod.rs +++ b/crates/stages/src/test_utils/mod.rs @@ -2,14 +2,24 @@ use crate::StageId; mod macros; -mod runner; -mod test_db; - pub(crate) use macros::*; + +mod runner; pub(crate) use runner::{ ExecuteStageTestRunner, StageTestRunner, TestRunnerError, UnwindStageTestRunner, }; + +mod test_db; pub use test_db::TestTransaction; +mod stage; +pub use stage::TestStage; + +mod set; +pub use set::TestStages; + +/// The test stage id +pub const TEST_STAGE_ID: StageId = StageId("TestStage"); + /// The previous test stage id mock used for testing pub(crate) const PREV_STAGE_ID: StageId = StageId("PrevStage"); diff --git a/crates/stages/src/test_utils/set.rs b/crates/stages/src/test_utils/set.rs new file mode 100644 index 0000000000..c5b1492845 --- /dev/null +++ b/crates/stages/src/test_utils/set.rs @@ -0,0 +1,29 @@ +use super::{TestStage, TEST_STAGE_ID}; +use crate::{ExecOutput, StageError, StageSet, StageSetBuilder, UnwindOutput}; +use reth_db::database::Database; +use std::collections::VecDeque; + +#[derive(Default, Debug)] +pub struct TestStages { + exec_outputs: VecDeque>, + unwind_outputs: VecDeque>, +} + +impl TestStages { + pub fn new( + exec_outputs: VecDeque>, + unwind_outputs: VecDeque>, + ) -> Self { + Self { exec_outputs, unwind_outputs } + } +} + +impl StageSet for TestStages { + fn builder(self) -> StageSetBuilder { + StageSetBuilder::default().add_stage( + TestStage::new(TEST_STAGE_ID) + .with_exec(self.exec_outputs) + .with_unwind(self.unwind_outputs), + ) + } +} diff --git a/crates/stages/src/test_utils/stage.rs b/crates/stages/src/test_utils/stage.rs new file mode 100644 index 0000000000..bd9913f9b5 --- /dev/null +++ b/crates/stages/src/test_utils/stage.rs @@ -0,0 +1,67 @@ +use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; +use reth_db::database::Database; +use reth_provider::Transaction; +use std::collections::VecDeque; + +#[derive(Debug)] +pub struct TestStage { + id: StageId, + exec_outputs: VecDeque>, + unwind_outputs: VecDeque>, +} + +impl TestStage { + pub fn new(id: StageId) -> Self { + Self { id, exec_outputs: VecDeque::new(), unwind_outputs: VecDeque::new() } + } + + pub fn with_exec(mut self, exec_outputs: VecDeque>) -> Self { + self.exec_outputs = exec_outputs; + self + } + + pub fn with_unwind( + mut self, + unwind_outputs: VecDeque>, + ) -> Self { + self.unwind_outputs = unwind_outputs; + self + } + + pub fn add_exec(mut self, output: Result) -> Self { + self.exec_outputs.push_back(output); + self + } + + pub fn add_unwind(mut self, output: Result) -> Self { + self.unwind_outputs.push_back(output); + self + } +} + +#[async_trait::async_trait] +impl Stage for TestStage { + fn id(&self) -> StageId { + self.id + } + + async fn execute( + &mut self, + _: &mut Transaction<'_, DB>, + _input: ExecInput, + ) -> Result { + self.exec_outputs + .pop_front() + .unwrap_or_else(|| panic!("Test stage {} executed too many times.", self.id)) + } + + async fn unwind( + &mut self, + _: &mut Transaction<'_, DB>, + _input: UnwindInput, + ) -> Result { + self.unwind_outputs + .pop_front() + .unwrap_or_else(|| panic!("Test stage {} unwound too many times.", self.id)) + } +}