feat: don't stop the pipeline on internal stage errs (#453)

* feat: don't stop the pipeline on internal stage errs

* test: add tests for pipeline err handling

* chore: few notes

* refactor: stage error fatal/recoverable variants

* refactor: use recoverable errors in headers stage

* test: adjust tests

* chore: nits

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
Bjerg
2022-12-15 12:48:41 +01:00
committed by GitHub
parent 9dbf280bfb
commit f8b4251886
5 changed files with 88 additions and 36 deletions

View File

@@ -21,6 +21,7 @@ pub enum StageError {
Database(#[from] DbError),
#[error("Stage encountered a execution error in block {block}: {error}.")]
/// The stage encountered a execution error
// TODO: Probably redundant, should be rolled into `Validation`
ExecutionError {
/// The block that failed execution.
block: BlockNumber,
@@ -35,9 +36,26 @@ pub enum StageError {
/// rely on external downloaders
#[error("Invalid download response: {0}")]
Download(String),
/// The stage encountered an internal error.
/// The stage encountered a recoverable error.
///
/// These types of errors are caught by the [Pipeline] and trigger a restart of the stage.
#[error(transparent)]
Internal(Box<dyn std::error::Error + Send + Sync>),
Recoverable(Box<dyn std::error::Error + Send + Sync>),
/// The stage encountered a fatal error.
///
/// These types of errors stop the pipeline.
#[error(transparent)]
Fatal(Box<dyn std::error::Error + Send + Sync>),
}
impl StageError {
/// If the error is fatal the pipeline will stop.
pub fn is_fatal(&self) -> bool {
matches!(
self,
StageError::Database(_) | StageError::DatabaseIntegrity(_) | StageError::Fatal(_)
)
}
}
/// A database integrity error.

View File

@@ -255,7 +255,7 @@ impl<DB: Database> Pipeline<DB> {
}
Err(err) => {
self.events_sender.send(PipelineEvent::Error { stage_id }).await?;
return Err(PipelineError::Stage(StageError::Internal(err)))
return Err(PipelineError::Stage(StageError::Fatal(err)))
}
}
}
@@ -287,7 +287,7 @@ impl<DB: Database> QueuedStage<DB> {
) -> Result<ControlFlow, PipelineError> {
let stage_id = self.stage.id();
if self.require_tip && !state.reached_tip() {
info!("Tip not reached, skipping.");
warn!(stage = %stage_id, "Tip not reached as required by stage, skipping.");
state.events_sender.send(PipelineEvent::Skipped { stage_id }).await?;
// Stage requires us to reach the tip of the chain first, but we have
@@ -304,7 +304,7 @@ impl<DB: Database> QueuedStage<DB> {
.zip(state.max_block)
.map_or(false, |(prev_progress, target)| prev_progress >= target);
if stage_reached_max_block {
info!("Stage reached maximum block, skipping.");
warn!(stage = %stage_id, "Stage reached maximum block, skipping.");
state.events_sender.send(PipelineEvent::Skipped { stage_id }).await?;
// We reached the maximum block, so we skip the stage
@@ -323,7 +323,7 @@ impl<DB: Database> QueuedStage<DB> {
.await
{
Ok(out @ ExecOutput { stage_progress, done, reached_tip }) => {
debug!(stage = %stage_id, %stage_progress, %done, "Stage made progress");
info!(stage = %stage_id, %stage_progress, %done, "Stage made progress");
stage_id.save_progress(db.deref(), stage_progress)?;
state
@@ -345,7 +345,7 @@ impl<DB: Database> QueuedStage<DB> {
state.events_sender.send(PipelineEvent::Error { stage_id }).await?;
return if let StageError::Validation { block, error } = err {
debug!(stage = %stage_id, bad_block = %block, "Stage encountered a validation error: {error}");
warn!(stage = %stage_id, bad_block = %block, "Stage encountered a validation error: {error}");
// We unwind because of a validation error. If the unwind itself fails,
// we bail entirely, otherwise we restart the execution loop from the
@@ -354,8 +354,14 @@ impl<DB: Database> QueuedStage<DB> {
target: prev_progress.unwrap_or_default(),
bad_block: Some(block),
})
} else {
} else if err.is_fatal() {
error!(stage = %stage_id, "Stage encountered a fatal error: {err}.");
Err(err.into())
} else {
// On other errors we assume they are recoverable if we discard the
// transaction and run the stage again.
warn!(stage = %stage_id, "Stage encountered a non-fatal error: {err}. Retrying");
continue
}
}
}
@@ -367,6 +373,7 @@ impl<DB: Database> QueuedStage<DB> {
mod tests {
use super::*;
use crate::{StageId, UnwindOutput};
use assert_matches::assert_matches;
use reth_db::mdbx::{self, test_utils, Env, EnvKind, WriteMap};
use reth_interfaces::consensus;
use tokio::sync::mpsc::channel;
@@ -724,6 +731,42 @@ mod tests {
);
}
/// Checks that the pipeline re-runs stages on non-fatal errors and stops on fatal ones.
#[tokio::test]
async fn pipeline_error_handling() {
// Non-fatal
let db = test_utils::create_test_db(EnvKind::RW);
let result = Pipeline::<Env<WriteMap>>::new()
.push(
TestStage::new(StageId("NonFatal"))
.add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error))))
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true, reached_tip: true })),
false,
)
.set_max_block(Some(10))
.run(db)
.await;
assert_matches!(result, Ok(()));
// Fatal
let db = test_utils::create_test_db(EnvKind::RW);
let result = Pipeline::<Env<WriteMap>>::new()
.push(
TestStage::new(StageId("Fatal")).add_exec(Err(StageError::DatabaseIntegrity(
DatabaseIntegrityError::BlockBody { number: 5 },
))),
false,
)
.run(db)
.await;
assert_matches!(
result,
Err(PipelineError::Stage(StageError::DatabaseIntegrity(
DatabaseIntegrityError::BlockBody { number: 5 }
)))
);
}
mod utils {
use super::*;
use async_trait::async_trait;

View File

@@ -16,7 +16,7 @@ use reth_primitives::{
BlockNumber, SealedHeader,
};
use std::{fmt::Debug, sync::Arc};
use tracing::{error, warn};
use tracing::warn;
const BODIES: StageId = StageId("Bodies");
@@ -111,23 +111,18 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
let mut bodies_stream = self.downloader.bodies_stream(bodies_to_download.iter());
let mut highest_block = previous_block;
while let Some(result) = bodies_stream.next().await {
let block = match result {
Ok(block) => block,
Err(err) => {
error!(
"Encountered error downloading block {}. Details: {:?}",
highest_block + 1,
err
);
// Exit the stage early
return Ok(ExecOutput {
stage_progress: highest_block,
done: false,
reached_tip: false,
})
}
let Ok(block) = result else {
error!(
"Encountered an error downloading block {}: {:?}",
highest_block + 1,
result.unwrap_err()
);
return Ok(ExecOutput {
stage_progress: highest_block,
done: false,
reached_tip: false,
})
};
let block_number = block.number;
// Write block
let key = (block_number, block.hash()).into();

View File

@@ -113,7 +113,7 @@ impl<DB: Database, D: HeaderDownloader, C: Consensus, H: HeadersClient, S: Statu
Err(e) => match e {
DownloadError::Timeout => {
warn!("No response for header request");
return Ok(ExecOutput { stage_progress, reached_tip: false, done: false })
return Err(StageError::Recoverable(DownloadError::Timeout.into()))
}
DownloadError::HeaderValidation { hash, error } => {
error!("Validation error for header {hash}: {error}");
@@ -121,7 +121,7 @@ impl<DB: Database, D: HeaderDownloader, C: Consensus, H: HeadersClient, S: Statu
}
error => {
error!(?error, "An unexpected error occurred");
return Ok(ExecOutput { stage_progress, reached_tip: false, done: false })
return Err(StageError::Recoverable(error.into()))
}
},
}
@@ -279,10 +279,8 @@ mod tests {
let rx = runner.execute(input);
runner.consensus.update_tip(H256::from_low_u64_be(1));
let result = rx.await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { done: false, reached_tip: false, stage_progress: 100 })
);
// TODO: Downcast the internal error and actually check it
assert_matches!(result, Err(StageError::Recoverable(_)));
assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed");
}
@@ -324,10 +322,7 @@ mod tests {
// These errors are not fatal but hand back control to the pipeline
let result = rx.await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { stage_progress: 1000, done: false, reached_tip: false })
);
assert_matches!(result, Err(StageError::Recoverable(_)));
assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed");
}

View File

@@ -28,6 +28,7 @@ pub struct SendersStage {
pub commit_threshold: u64,
}
// TODO(onbjerg): Should unwind
#[derive(Error, Debug)]
enum SendersStageError {
#[error("Sender recovery failed for transaction {tx}.")]
@@ -36,7 +37,7 @@ enum SendersStageError {
impl From<SendersStageError> for StageError {
fn from(error: SendersStageError) -> Self {
StageError::Internal(Box::new(error))
StageError::Fatal(Box::new(error))
}
}