From 7fa16c9155fff799e70432c86f1443f4a249be64 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Thu, 1 Aug 2024 19:01:35 +0200 Subject: [PATCH] feat: add pipeline consistency check to launch ctx and use in launcher (#9994) --- crates/ethereum/node/src/launch.rs | 3 +- crates/node/builder/src/launch/common.rs | 67 ++++++++++++++++++++++-- 2 files changed, 65 insertions(+), 5 deletions(-) diff --git a/crates/ethereum/node/src/launch.rs b/crates/ethereum/node/src/launch.rs index c2f36f619e..d2278699e3 100644 --- a/crates/ethereum/node/src/launch.rs +++ b/crates/ethereum/node/src/launch.rs @@ -160,8 +160,6 @@ where let pipeline_events = pipeline.events(); - let initial_target = ctx.node_config().debug.tip; - let mut pruner_builder = ctx.pruner_builder(); if let Some(exex_manager_handle) = &exex_manager_handle { pruner_builder = @@ -251,6 +249,7 @@ where .await?; // Run consensus engine to completion + let initial_target = ctx.initial_backfill_target()?; let network_handle = ctx.components().network().clone(); let chainspec = ctx.chain_spec(); let (exit, rx) = oneshot::channel(); diff --git a/crates/node/builder/src/launch/common.rs b/crates/node/builder/src/launch/common.rs index 66f620e53d..21a6e7219e 100644 --- a/crates/node/builder/src/launch/common.rs +++ b/crates/node/builder/src/launch/common.rs @@ -37,13 +37,13 @@ use reth_node_metrics::{ use reth_primitives::{BlockNumber, Head, B256}; use reth_provider::{ providers::{BlockchainProvider, BlockchainProvider2, StaticFileProvider}, - CanonStateNotificationSender, FullProvider, ProviderFactory, StaticFileProviderFactory, - TreeViewer, + BlockHashReader, CanonStateNotificationSender, FullProvider, ProviderFactory, ProviderResult, + StageCheckpointReader, StaticFileProviderFactory, TreeViewer, }; use reth_prune::{PruneModes, PrunerBuilder}; use reth_rpc_builder::config::RethRpcServerConfig; use reth_rpc_layer::JwtSecret; -use reth_stages::{sets::DefaultStages, MetricEvent, Pipeline, PipelineTarget}; +use reth_stages::{sets::DefaultStages, MetricEvent, Pipeline, PipelineTarget, StageId}; use reth_static_file::StaticFileProducer; use reth_tasks::TaskExecutor; use reth_tracing::tracing::{debug, error, info, warn}; @@ -759,6 +759,67 @@ where &self.right().blockchain_db } + /// Returns the initial backfill to sync to at launch. + /// + /// This returns the configured `debug.tip` if set, otherwise it will check if backfill was + /// previously interrupted and returns the block hash of the last checkpoint, see also + /// [`Self::check_pipeline_consistency`] + pub fn initial_backfill_target(&self) -> ProviderResult> { + let mut initial_target = self.node_config().debug.tip; + + if initial_target.is_none() { + initial_target = self.check_pipeline_consistency()?; + } + + Ok(initial_target) + } + + /// Check if the pipeline is consistent (all stages have the checkpoint block numbers no less + /// than the checkpoint of the first stage). + /// + /// This will return the pipeline target if: + /// * the pipeline was interrupted during its previous run + /// * a new stage was added + /// * stage data was dropped manually through `reth stage drop ...` + /// + /// # Returns + /// + /// A target block hash if the pipeline is inconsistent, otherwise `None`. + pub fn check_pipeline_consistency(&self) -> ProviderResult> { + // If no target was provided, check if the stages are congruent - check if the + // checkpoint of the last stage matches the checkpoint of the first. + let first_stage_checkpoint = self + .blockchain_db() + .get_stage_checkpoint(*StageId::ALL.first().unwrap())? + .unwrap_or_default() + .block_number; + + // Skip the first stage as we've already retrieved it and comparing all other checkpoints + // against it. + for stage_id in StageId::ALL.iter().skip(1) { + let stage_checkpoint = self + .blockchain_db() + .get_stage_checkpoint(*stage_id)? + .unwrap_or_default() + .block_number; + + // If the checkpoint of any stage is less than the checkpoint of the first stage, + // retrieve and return the block hash of the latest header and use it as the target. + if stage_checkpoint < first_stage_checkpoint { + debug!( + target: "consensus::engine", + first_stage_checkpoint, + inconsistent_stage_id = %stage_id, + inconsistent_stage_checkpoint = stage_checkpoint, + "Pipeline sync progress is inconsistent" + ); + return self.blockchain_db().block_hash(first_stage_checkpoint) + } + } + + Ok(None) + } + /// Returns the configured `Consensus`. pub fn consensus(&self) -> Arc { self.right().consensus.clone()