mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-28 08:37:59 -05:00
feat: add pipeline consistency check to launch ctx and use in launcher (#9994)
This commit is contained in:
@@ -160,8 +160,6 @@ where
|
||||
|
||||
let pipeline_events = pipeline.events();
|
||||
|
||||
let initial_target = ctx.node_config().debug.tip;
|
||||
|
||||
let mut pruner_builder = ctx.pruner_builder();
|
||||
if let Some(exex_manager_handle) = &exex_manager_handle {
|
||||
pruner_builder =
|
||||
@@ -251,6 +249,7 @@ where
|
||||
.await?;
|
||||
|
||||
// Run consensus engine to completion
|
||||
let initial_target = ctx.initial_backfill_target()?;
|
||||
let network_handle = ctx.components().network().clone();
|
||||
let chainspec = ctx.chain_spec();
|
||||
let (exit, rx) = oneshot::channel();
|
||||
|
||||
@@ -37,13 +37,13 @@ use reth_node_metrics::{
|
||||
use reth_primitives::{BlockNumber, Head, B256};
|
||||
use reth_provider::{
|
||||
providers::{BlockchainProvider, BlockchainProvider2, StaticFileProvider},
|
||||
CanonStateNotificationSender, FullProvider, ProviderFactory, StaticFileProviderFactory,
|
||||
TreeViewer,
|
||||
BlockHashReader, CanonStateNotificationSender, FullProvider, ProviderFactory, ProviderResult,
|
||||
StageCheckpointReader, StaticFileProviderFactory, TreeViewer,
|
||||
};
|
||||
use reth_prune::{PruneModes, PrunerBuilder};
|
||||
use reth_rpc_builder::config::RethRpcServerConfig;
|
||||
use reth_rpc_layer::JwtSecret;
|
||||
use reth_stages::{sets::DefaultStages, MetricEvent, Pipeline, PipelineTarget};
|
||||
use reth_stages::{sets::DefaultStages, MetricEvent, Pipeline, PipelineTarget, StageId};
|
||||
use reth_static_file::StaticFileProducer;
|
||||
use reth_tasks::TaskExecutor;
|
||||
use reth_tracing::tracing::{debug, error, info, warn};
|
||||
@@ -759,6 +759,67 @@ where
|
||||
&self.right().blockchain_db
|
||||
}
|
||||
|
||||
/// Returns the initial backfill to sync to at launch.
|
||||
///
|
||||
/// This returns the configured `debug.tip` if set, otherwise it will check if backfill was
|
||||
/// previously interrupted and returns the block hash of the last checkpoint, see also
|
||||
/// [`Self::check_pipeline_consistency`]
|
||||
pub fn initial_backfill_target(&self) -> ProviderResult<Option<B256>> {
|
||||
let mut initial_target = self.node_config().debug.tip;
|
||||
|
||||
if initial_target.is_none() {
|
||||
initial_target = self.check_pipeline_consistency()?;
|
||||
}
|
||||
|
||||
Ok(initial_target)
|
||||
}
|
||||
|
||||
/// Check if the pipeline is consistent (all stages have the checkpoint block numbers no less
|
||||
/// than the checkpoint of the first stage).
|
||||
///
|
||||
/// This will return the pipeline target if:
|
||||
/// * the pipeline was interrupted during its previous run
|
||||
/// * a new stage was added
|
||||
/// * stage data was dropped manually through `reth stage drop ...`
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// A target block hash if the pipeline is inconsistent, otherwise `None`.
|
||||
pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
|
||||
// If no target was provided, check if the stages are congruent - check if the
|
||||
// checkpoint of the last stage matches the checkpoint of the first.
|
||||
let first_stage_checkpoint = self
|
||||
.blockchain_db()
|
||||
.get_stage_checkpoint(*StageId::ALL.first().unwrap())?
|
||||
.unwrap_or_default()
|
||||
.block_number;
|
||||
|
||||
// Skip the first stage as we've already retrieved it and comparing all other checkpoints
|
||||
// against it.
|
||||
for stage_id in StageId::ALL.iter().skip(1) {
|
||||
let stage_checkpoint = self
|
||||
.blockchain_db()
|
||||
.get_stage_checkpoint(*stage_id)?
|
||||
.unwrap_or_default()
|
||||
.block_number;
|
||||
|
||||
// If the checkpoint of any stage is less than the checkpoint of the first stage,
|
||||
// retrieve and return the block hash of the latest header and use it as the target.
|
||||
if stage_checkpoint < first_stage_checkpoint {
|
||||
debug!(
|
||||
target: "consensus::engine",
|
||||
first_stage_checkpoint,
|
||||
inconsistent_stage_id = %stage_id,
|
||||
inconsistent_stage_checkpoint = stage_checkpoint,
|
||||
"Pipeline sync progress is inconsistent"
|
||||
);
|
||||
return self.blockchain_db().block_hash(first_stage_checkpoint)
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Returns the configured `Consensus`.
|
||||
pub fn consensus(&self) -> Arc<dyn Consensus> {
|
||||
self.right().consensus.clone()
|
||||
|
||||
Reference in New Issue
Block a user