chore(pipeline): expose TestStage and add test stage set (#1844)

This commit is contained in:
Roman Krasiuk
2023-03-19 17:47:00 +02:00
committed by GitHub
parent 488295f56f
commit 0128d42b4b
4 changed files with 110 additions and 60 deletions

View File

@@ -375,12 +375,11 @@ pub(crate) type BoxedStage<DB> = Box<dyn Stage<DB>>;
#[cfg(test)]
mod tests {
use super::*;
use crate::{StageId, UnwindOutput};
use crate::{test_utils::TestStage, StageId, UnwindOutput};
use assert_matches::assert_matches;
use reth_db::mdbx::{self, test_utils, EnvKind};
use reth_interfaces::{consensus, provider::ProviderError, sync::NoopSyncStateUpdate};
use tokio_stream::StreamExt;
use utils::TestStage;
#[test]
fn record_progress_calculates_outliers() {
@@ -652,59 +651,4 @@ mod tests {
})))
);
}
mod utils {
use super::*;
use async_trait::async_trait;
use std::collections::VecDeque;
pub(crate) struct TestStage {
id: StageId,
exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
unwind_outputs: VecDeque<Result<UnwindOutput, StageError>>,
}
impl TestStage {
pub(crate) fn new(id: StageId) -> Self {
Self { id, exec_outputs: VecDeque::new(), unwind_outputs: VecDeque::new() }
}
pub(crate) fn add_exec(mut self, output: Result<ExecOutput, StageError>) -> Self {
self.exec_outputs.push_back(output);
self
}
pub(crate) fn add_unwind(mut self, output: Result<UnwindOutput, StageError>) -> Self {
self.unwind_outputs.push_back(output);
self
}
}
#[async_trait]
impl<DB: Database> Stage<DB> for TestStage {
fn id(&self) -> StageId {
self.id
}
async fn execute(
&mut self,
_: &mut Transaction<'_, DB>,
_input: ExecInput,
) -> Result<ExecOutput, StageError> {
self.exec_outputs
.pop_front()
.unwrap_or_else(|| panic!("Test stage {} executed too many times.", self.id))
}
async fn unwind(
&mut self,
_: &mut Transaction<'_, DB>,
_input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
self.unwind_outputs
.pop_front()
.unwrap_or_else(|| panic!("Test stage {} unwound too many times.", self.id))
}
}
}
}

View File

@@ -2,14 +2,24 @@
use crate::StageId;
mod macros;
mod runner;
mod test_db;
pub(crate) use macros::*;
mod runner;
pub(crate) use runner::{
ExecuteStageTestRunner, StageTestRunner, TestRunnerError, UnwindStageTestRunner,
};
mod test_db;
pub use test_db::TestTransaction;
mod stage;
pub use stage::TestStage;
mod set;
pub use set::TestStages;
/// The test stage id
pub const TEST_STAGE_ID: StageId = StageId("TestStage");
/// The previous test stage id mock used for testing
pub(crate) const PREV_STAGE_ID: StageId = StageId("PrevStage");

View File

@@ -0,0 +1,29 @@
use super::{TestStage, TEST_STAGE_ID};
use crate::{ExecOutput, StageError, StageSet, StageSetBuilder, UnwindOutput};
use reth_db::database::Database;
use std::collections::VecDeque;
#[derive(Default, Debug)]
pub struct TestStages {
exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
unwind_outputs: VecDeque<Result<UnwindOutput, StageError>>,
}
impl TestStages {
pub fn new(
exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
unwind_outputs: VecDeque<Result<UnwindOutput, StageError>>,
) -> Self {
Self { exec_outputs, unwind_outputs }
}
}
impl<DB: Database> StageSet<DB> for TestStages {
fn builder(self) -> StageSetBuilder<DB> {
StageSetBuilder::default().add_stage(
TestStage::new(TEST_STAGE_ID)
.with_exec(self.exec_outputs)
.with_unwind(self.unwind_outputs),
)
}
}

View File

@@ -0,0 +1,67 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
use reth_db::database::Database;
use reth_provider::Transaction;
use std::collections::VecDeque;
#[derive(Debug)]
pub struct TestStage {
id: StageId,
exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
unwind_outputs: VecDeque<Result<UnwindOutput, StageError>>,
}
impl TestStage {
pub fn new(id: StageId) -> Self {
Self { id, exec_outputs: VecDeque::new(), unwind_outputs: VecDeque::new() }
}
pub fn with_exec(mut self, exec_outputs: VecDeque<Result<ExecOutput, StageError>>) -> Self {
self.exec_outputs = exec_outputs;
self
}
pub fn with_unwind(
mut self,
unwind_outputs: VecDeque<Result<UnwindOutput, StageError>>,
) -> Self {
self.unwind_outputs = unwind_outputs;
self
}
pub fn add_exec(mut self, output: Result<ExecOutput, StageError>) -> Self {
self.exec_outputs.push_back(output);
self
}
pub fn add_unwind(mut self, output: Result<UnwindOutput, StageError>) -> Self {
self.unwind_outputs.push_back(output);
self
}
}
#[async_trait::async_trait]
impl<DB: Database> Stage<DB> for TestStage {
fn id(&self) -> StageId {
self.id
}
async fn execute(
&mut self,
_: &mut Transaction<'_, DB>,
_input: ExecInput,
) -> Result<ExecOutput, StageError> {
self.exec_outputs
.pop_front()
.unwrap_or_else(|| panic!("Test stage {} executed too many times.", self.id))
}
async fn unwind(
&mut self,
_: &mut Transaction<'_, DB>,
_input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
self.unwind_outputs
.pop_front()
.unwrap_or_else(|| panic!("Test stage {} unwound too many times.", self.id))
}
}