fix(stages): call post execute/unwind commit hooks (#11413)

This commit is contained in:
Alexey Shekhirin
2024-10-02 13:24:05 +03:00
committed by GitHub
parent d47904f7cd
commit 82cc96d0c0
2 changed files with 93 additions and 26 deletions

View File

@@ -463,6 +463,8 @@ impl<N: ProviderNodeTypes> Pipeline<N> {
self.provider_factory.static_file_provider(),
)?;
stage.post_execute_commit()?;
if done {
let block_number = checkpoint.block_number;
return Ok(if made_progress {
@@ -586,6 +588,8 @@ impl<N: ProviderNodeTypes> std::fmt::Debug for Pipeline<N> {
#[cfg(test)]
mod tests {
use std::sync::atomic::Ordering;
use super::*;
use crate::{test_utils::TestStage, UnwindOutput};
use assert_matches::assert_matches;
@@ -628,15 +632,19 @@ mod tests {
async fn run_pipeline() {
let provider_factory = create_test_provider_factory();
let stage_a = TestStage::new(StageId::Other("A"))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }));
let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
let stage_b = TestStage::new(StageId::Other("B"))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }));
let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
.add_stage(
TestStage::new(StageId::Other("A"))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true })),
)
.add_stage(
TestStage::new(StageId::Other("B"))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
)
.add_stage(stage_a)
.add_stage(stage_b)
.with_max_block(10)
.build(
provider_factory.clone(),
@@ -689,6 +697,12 @@ mod tests {
},
]
);
assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 0);
assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 0);
}
/// Unwinds a simple pipeline.
@@ -696,22 +710,28 @@ mod tests {
async fn unwind_pipeline() {
let provider_factory = create_test_provider_factory();
let stage_a = TestStage::new(StageId::Other("A"))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
let stage_b = TestStage::new(StageId::Other("B"))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
let stage_c = TestStage::new(StageId::Other("C"))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }))
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
let (stage_c, post_execute_commit_counter_c) = stage_c.with_post_execute_commit_counter();
let (stage_c, post_unwind_commit_counter_c) = stage_c.with_post_unwind_commit_counter();
let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
.add_stage(
TestStage::new(StageId::Other("A"))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })),
)
.add_stage(
TestStage::new(StageId::Other("B"))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })),
)
.add_stage(
TestStage::new(StageId::Other("C"))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }))
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })),
)
.add_stage(stage_a)
.add_stage(stage_b)
.add_stage(stage_c)
.with_max_block(10)
.build(
provider_factory.clone(),
@@ -823,6 +843,15 @@ mod tests {
},
]
);
assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 1);
assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 1);
assert_eq!(post_execute_commit_counter_c.load(Ordering::Relaxed), 1);
assert_eq!(post_unwind_commit_counter_c.load(Ordering::Relaxed), 1);
}
/// Unwinds a pipeline with intermediate progress.

View File

@@ -1,7 +1,13 @@
#![allow(missing_docs)]
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
use std::collections::VecDeque;
use std::{
collections::VecDeque,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
/// A test stage that can be used for testing.
///
@@ -11,11 +17,19 @@ pub struct TestStage {
id: StageId,
exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
unwind_outputs: VecDeque<Result<UnwindOutput, StageError>>,
post_execute_commit_counter: Arc<AtomicUsize>,
post_unwind_commit_counter: Arc<AtomicUsize>,
}
impl TestStage {
pub const fn new(id: StageId) -> Self {
Self { id, exec_outputs: VecDeque::new(), unwind_outputs: VecDeque::new() }
pub fn new(id: StageId) -> Self {
Self {
id,
exec_outputs: VecDeque::new(),
unwind_outputs: VecDeque::new(),
post_execute_commit_counter: Arc::new(AtomicUsize::new(0)),
post_unwind_commit_counter: Arc::new(AtomicUsize::new(0)),
}
}
pub fn with_exec(mut self, exec_outputs: VecDeque<Result<ExecOutput, StageError>>) -> Self {
@@ -40,6 +54,18 @@ impl TestStage {
self.unwind_outputs.push_back(output);
self
}
pub fn with_post_execute_commit_counter(mut self) -> (Self, Arc<AtomicUsize>) {
let counter = Arc::new(AtomicUsize::new(0));
self.post_execute_commit_counter = counter.clone();
(self, counter)
}
pub fn with_post_unwind_commit_counter(mut self) -> (Self, Arc<AtomicUsize>) {
let counter = Arc::new(AtomicUsize::new(0));
self.post_unwind_commit_counter = counter.clone();
(self, counter)
}
}
impl<Provider> Stage<Provider> for TestStage {
@@ -53,9 +79,21 @@ impl<Provider> Stage<Provider> for TestStage {
.unwrap_or_else(|| panic!("Test stage {} executed too many times.", self.id))
}
fn post_execute_commit(&mut self) -> Result<(), StageError> {
self.post_execute_commit_counter.fetch_add(1, Ordering::Relaxed);
Ok(())
}
fn unwind(&mut self, _: &Provider, _input: UnwindInput) -> Result<UnwindOutput, StageError> {
self.unwind_outputs
.pop_front()
.unwrap_or_else(|| panic!("Test stage {} unwound too many times.", self.id))
}
fn post_unwind_commit(&mut self) -> Result<(), StageError> {
self.post_unwind_commit_counter.fetch_add(1, Ordering::Relaxed);
Ok(())
}
}