From f8b42518860645ffb9f3a1c75274a515ae1b1ead Mon Sep 17 00:00:00 2001 From: Bjerg Date: Thu, 15 Dec 2022 12:48:41 +0100 Subject: [PATCH] feat: don't stop the pipeline on internal stage errs (#453) * feat: don't stop the pipeline on internal stage errs * test: add tests for pipeline err handling * chore: few notes * refactor: stage error fatal/recoverable variants * refactor: use recoverable errors in headers stage * test: adjust tests * chore: nits Co-authored-by: Georgios Konstantopoulos Co-authored-by: Georgios Konstantopoulos --- crates/stages/src/error.rs | 22 ++++++++++-- crates/stages/src/pipeline.rs | 55 +++++++++++++++++++++++++---- crates/stages/src/stages/bodies.rs | 29 +++++++-------- crates/stages/src/stages/headers.rs | 15 +++----- crates/stages/src/stages/senders.rs | 3 +- 5 files changed, 88 insertions(+), 36 deletions(-) diff --git a/crates/stages/src/error.rs b/crates/stages/src/error.rs index 550ff4845d..fc3781f12a 100644 --- a/crates/stages/src/error.rs +++ b/crates/stages/src/error.rs @@ -21,6 +21,7 @@ pub enum StageError { Database(#[from] DbError), #[error("Stage encountered a execution error in block {block}: {error}.")] /// The stage encountered a execution error + // TODO: Probably redundant, should be rolled into `Validation` ExecutionError { /// The block that failed execution. block: BlockNumber, @@ -35,9 +36,26 @@ pub enum StageError { /// rely on external downloaders #[error("Invalid download response: {0}")] Download(String), - /// The stage encountered an internal error. + /// The stage encountered a recoverable error. + /// + /// These types of errors are caught by the [Pipeline] and trigger a restart of the stage. #[error(transparent)] - Internal(Box), + Recoverable(Box), + /// The stage encountered a fatal error. + /// + /// These types of errors stop the pipeline. + #[error(transparent)] + Fatal(Box), +} + +impl StageError { + /// If the error is fatal the pipeline will stop. + pub fn is_fatal(&self) -> bool { + matches!( + self, + StageError::Database(_) | StageError::DatabaseIntegrity(_) | StageError::Fatal(_) + ) + } } /// A database integrity error. diff --git a/crates/stages/src/pipeline.rs b/crates/stages/src/pipeline.rs index aeca7bbaca..0721cd7e5c 100644 --- a/crates/stages/src/pipeline.rs +++ b/crates/stages/src/pipeline.rs @@ -255,7 +255,7 @@ impl Pipeline { } Err(err) => { self.events_sender.send(PipelineEvent::Error { stage_id }).await?; - return Err(PipelineError::Stage(StageError::Internal(err))) + return Err(PipelineError::Stage(StageError::Fatal(err))) } } } @@ -287,7 +287,7 @@ impl QueuedStage { ) -> Result { let stage_id = self.stage.id(); if self.require_tip && !state.reached_tip() { - info!("Tip not reached, skipping."); + warn!(stage = %stage_id, "Tip not reached as required by stage, skipping."); state.events_sender.send(PipelineEvent::Skipped { stage_id }).await?; // Stage requires us to reach the tip of the chain first, but we have @@ -304,7 +304,7 @@ impl QueuedStage { .zip(state.max_block) .map_or(false, |(prev_progress, target)| prev_progress >= target); if stage_reached_max_block { - info!("Stage reached maximum block, skipping."); + warn!(stage = %stage_id, "Stage reached maximum block, skipping."); state.events_sender.send(PipelineEvent::Skipped { stage_id }).await?; // We reached the maximum block, so we skip the stage @@ -323,7 +323,7 @@ impl QueuedStage { .await { Ok(out @ ExecOutput { stage_progress, done, reached_tip }) => { - debug!(stage = %stage_id, %stage_progress, %done, "Stage made progress"); + info!(stage = %stage_id, %stage_progress, %done, "Stage made progress"); stage_id.save_progress(db.deref(), stage_progress)?; state @@ -345,7 +345,7 @@ impl QueuedStage { state.events_sender.send(PipelineEvent::Error { stage_id }).await?; return if let StageError::Validation { block, error } = err { - debug!(stage = %stage_id, bad_block = %block, "Stage encountered a validation error: {error}"); + warn!(stage = %stage_id, bad_block = %block, "Stage encountered a validation error: {error}"); // We unwind because of a validation error. If the unwind itself fails, // we bail entirely, otherwise we restart the execution loop from the @@ -354,8 +354,14 @@ impl QueuedStage { target: prev_progress.unwrap_or_default(), bad_block: Some(block), }) - } else { + } else if err.is_fatal() { + error!(stage = %stage_id, "Stage encountered a fatal error: {err}."); Err(err.into()) + } else { + // On other errors we assume they are recoverable if we discard the + // transaction and run the stage again. + warn!(stage = %stage_id, "Stage encountered a non-fatal error: {err}. Retrying"); + continue } } } @@ -367,6 +373,7 @@ impl QueuedStage { mod tests { use super::*; use crate::{StageId, UnwindOutput}; + use assert_matches::assert_matches; use reth_db::mdbx::{self, test_utils, Env, EnvKind, WriteMap}; use reth_interfaces::consensus; use tokio::sync::mpsc::channel; @@ -724,6 +731,42 @@ mod tests { ); } + /// Checks that the pipeline re-runs stages on non-fatal errors and stops on fatal ones. + #[tokio::test] + async fn pipeline_error_handling() { + // Non-fatal + let db = test_utils::create_test_db(EnvKind::RW); + let result = Pipeline::>::new() + .push( + TestStage::new(StageId("NonFatal")) + .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error)))) + .add_exec(Ok(ExecOutput { stage_progress: 10, done: true, reached_tip: true })), + false, + ) + .set_max_block(Some(10)) + .run(db) + .await; + assert_matches!(result, Ok(())); + + // Fatal + let db = test_utils::create_test_db(EnvKind::RW); + let result = Pipeline::>::new() + .push( + TestStage::new(StageId("Fatal")).add_exec(Err(StageError::DatabaseIntegrity( + DatabaseIntegrityError::BlockBody { number: 5 }, + ))), + false, + ) + .run(db) + .await; + assert_matches!( + result, + Err(PipelineError::Stage(StageError::DatabaseIntegrity( + DatabaseIntegrityError::BlockBody { number: 5 } + ))) + ); + } + mod utils { use super::*; use async_trait::async_trait; diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index 47952c0bd1..e0288fd9aa 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -16,7 +16,7 @@ use reth_primitives::{ BlockNumber, SealedHeader, }; use std::{fmt::Debug, sync::Arc}; -use tracing::{error, warn}; +use tracing::warn; const BODIES: StageId = StageId("Bodies"); @@ -111,23 +111,18 @@ impl Stage for BodyStage block, - Err(err) => { - error!( - "Encountered error downloading block {}. Details: {:?}", - highest_block + 1, - err - ); - // Exit the stage early - return Ok(ExecOutput { - stage_progress: highest_block, - done: false, - reached_tip: false, - }) - } + let Ok(block) = result else { + error!( + "Encountered an error downloading block {}: {:?}", + highest_block + 1, + result.unwrap_err() + ); + return Ok(ExecOutput { + stage_progress: highest_block, + done: false, + reached_tip: false, + }) }; - let block_number = block.number; // Write block let key = (block_number, block.hash()).into(); diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 7301ff40ac..77e6b889e0 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -113,7 +113,7 @@ impl match e { DownloadError::Timeout => { warn!("No response for header request"); - return Ok(ExecOutput { stage_progress, reached_tip: false, done: false }) + return Err(StageError::Recoverable(DownloadError::Timeout.into())) } DownloadError::HeaderValidation { hash, error } => { error!("Validation error for header {hash}: {error}"); @@ -121,7 +121,7 @@ impl { error!(?error, "An unexpected error occurred"); - return Ok(ExecOutput { stage_progress, reached_tip: false, done: false }) + return Err(StageError::Recoverable(error.into())) } }, } @@ -279,10 +279,8 @@ mod tests { let rx = runner.execute(input); runner.consensus.update_tip(H256::from_low_u64_be(1)); let result = rx.await.unwrap(); - assert_matches!( - result, - Ok(ExecOutput { done: false, reached_tip: false, stage_progress: 100 }) - ); + // TODO: Downcast the internal error and actually check it + assert_matches!(result, Err(StageError::Recoverable(_))); assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed"); } @@ -324,10 +322,7 @@ mod tests { // These errors are not fatal but hand back control to the pipeline let result = rx.await.unwrap(); - assert_matches!( - result, - Ok(ExecOutput { stage_progress: 1000, done: false, reached_tip: false }) - ); + assert_matches!(result, Err(StageError::Recoverable(_))); assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed"); } diff --git a/crates/stages/src/stages/senders.rs b/crates/stages/src/stages/senders.rs index a81e96a317..26b90f6a5e 100644 --- a/crates/stages/src/stages/senders.rs +++ b/crates/stages/src/stages/senders.rs @@ -28,6 +28,7 @@ pub struct SendersStage { pub commit_threshold: u64, } +// TODO(onbjerg): Should unwind #[derive(Error, Debug)] enum SendersStageError { #[error("Sender recovery failed for transaction {tx}.")] @@ -36,7 +37,7 @@ enum SendersStageError { impl From for StageError { fn from(error: SendersStageError) -> Self { - StageError::Internal(Box::new(error)) + StageError::Fatal(Box::new(error)) } }