From c25c398d34cb3bedea22067745040bce8f8b4205 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Thu, 1 Jun 2023 11:58:48 +0300 Subject: [PATCH] chore(engine): ensure previous pipeline run finished (#2920) --- bin/reth/src/node/mod.rs | 2 +- crates/consensus/beacon/src/engine/mod.rs | 46 ++++++++++++++++++++--- 2 files changed, 41 insertions(+), 7 deletions(-) diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 25d2bd5847..d77b8519ff 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -341,7 +341,7 @@ impl Command { initial_target, consensus_engine_tx, consensus_engine_rx, - ); + )?; info!(target: "reth::cli", "Consensus engine initialized"); let events = stream_select( diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index a98edbe75d..b3db6349f5 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -83,7 +83,7 @@ impl BeaconConsensusEngineHandle { /// Sends a new payload message to the beacon consensus engine and waits for a response. /// - ///See also + /// See also pub async fn new_payload( &self, payload: ExecutionPayload, @@ -200,7 +200,7 @@ where run_pipeline_continuously: bool, payload_builder: PayloadBuilderHandle, target: Option, - ) -> (Self, BeaconConsensusEngineHandle) { + ) -> Result<(Self, BeaconConsensusEngineHandle), reth_interfaces::Error> { let (to_engine, rx) = mpsc::unbounded_channel(); Self::with_channel( client, @@ -219,6 +219,16 @@ where /// Create a new instance of the [BeaconConsensusEngine] using the given channel to configure /// the [BeaconEngineMessage] communication channel. + /// + /// By default the engine is started with idle pipeline. + /// The pipeline can be launched immediately in one of the following ways descending in + /// priority: + /// - Explicit [Option::Some] target block hash provided via a constructor argument. + /// - The process was previously interrupted amidst the pipeline run. This is checked by + /// comparing the checkpoints of the first ([StageId::Headers]) and last ([StageId::Finish]) + /// stages. In this case, the latest available header in the database is used as the target. + /// + /// Propagates any database related error. #[allow(clippy::too_many_arguments)] pub fn with_channel( client: Client, @@ -232,7 +242,7 @@ where target: Option, to_engine: UnboundedSender, rx: UnboundedReceiver, - ) -> (Self, BeaconConsensusEngineHandle) { + ) -> Result<(Self, BeaconConsensusEngineHandle), reth_interfaces::Error> { let handle = BeaconConsensusEngineHandle { to_engine }; let sync = EngineSyncController::new( pipeline, @@ -254,11 +264,34 @@ where metrics: Metrics::default(), }; - if let Some(target) = target { + let maybe_pipeline_target = match target { + // Provided target always takes precedence. + target @ Some(_) => target, + None => { + // If no target was provided, check if the stages are congruent - check if the + // checkpoint of the last stage matches the checkpoint of the first. + let first_stage_checkpoint = this + .blockchain + .get_stage_checkpoint(*StageId::ALL.first().unwrap())? + .unwrap_or_default() + .block_number; + let last_stage_checkpoint = this + .blockchain + .get_stage_checkpoint(*StageId::ALL.last().unwrap())? + .unwrap_or_default() + .block_number; + if first_stage_checkpoint != last_stage_checkpoint { + this.blockchain.block_hash(first_stage_checkpoint)? + } else { + None + } + } + }; + if let Some(target) = maybe_pipeline_target { this.sync.set_pipeline_sync_target(target); } - (this, handle) + Ok((this, handle)) } /// Returns a new [`BeaconConsensusEngineHandle`] that can be cloned and shared. @@ -1186,7 +1219,8 @@ mod tests { false, payload_builder, None, - ); + ) + .expect("failed to create consensus engine"); (engine, TestEnv::new(db, tip_rx, handle)) }