diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index a1f8508f05..344d796a78 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -12,7 +12,7 @@ use reth_provider::ExecutorFactory; use reth_rpc_types::engine::{ ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, PayloadStatusEnum, }; -use reth_stages::Pipeline; +use reth_stages::{stages::FINISH, Pipeline}; use reth_tasks::TaskSpawner; use std::{ pin::Pin, @@ -131,19 +131,34 @@ where &mut self, state: ForkchoiceState, _attrs: Option, - ) -> ForkchoiceUpdated { + ) -> Result { trace!(target: "consensus::engine", ?state, "Received new forkchoice state"); if state.head_block_hash.is_zero() { - return ForkchoiceUpdated::new(PayloadStatus::from_status(PayloadStatusEnum::Invalid { - validation_error: BeaconEngineError::ForkchoiceEmptyHead.to_string(), - })) + return Ok(ForkchoiceUpdated::new(PayloadStatus::from_status( + PayloadStatusEnum::Invalid { + validation_error: BeaconEngineError::ForkchoiceEmptyHead.to_string(), + }, + ))) } let is_first_forkchoice = self.forkchoice_state.is_none(); self.forkchoice_state = Some(state); let status = if self.is_pipeline_idle() { match self.blockchain_tree.make_canonical(&state.head_block_hash) { - Ok(_) => PayloadStatus::from_status(PayloadStatusEnum::Valid), + Ok(_) => { + let head_block_number = self + .db + .view(|tx| tx.get::(state.head_block_hash))?? + .expect("was canonicalized, so it exists"); + let pipeline_min_progress = + FINISH.get_progress(&self.db.tx()?)?.unwrap_or_default(); + + if pipeline_min_progress < head_block_number { + self.require_pipeline_run(PipelineTarget::Head); + } + + PayloadStatus::from_status(PayloadStatusEnum::Valid) + } Err(error) => { warn!(target: "consensus::engine", ?state, ?error, "Error canonicalizing the head hash"); // If this is the first forkchoice received, start downloading from safe block @@ -169,7 +184,7 @@ where PayloadStatus::from_status(PayloadStatusEnum::Syncing) }; trace!(target: "consensus::engine", ?state, ?status, "Returning forkchoice status"); - ForkchoiceUpdated::new(status) + Ok(ForkchoiceUpdated::new(status)) } /// When the Consensus layer receives a new block via the consensus gossip protocol, @@ -309,7 +324,13 @@ where while let Poll::Ready(Some(msg)) = this.message_rx.poll_next_unpin(cx) { match msg { BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => { - let response = this.on_forkchoice_updated(state, payload_attrs); + let response = match this.on_forkchoice_updated(state, payload_attrs) { + Ok(response) => response, + Err(error) => { + error!(target: "consensus::engine", ?state, ?error, "Error getting forkchoice updated response"); + return Poll::Ready(Err(error.into())) + } + }; let is_valid_response = matches!(response.payload_status.status, PayloadStatusEnum::Valid); let _ = tx.send(Ok(response)); @@ -722,6 +743,7 @@ mod tests { let genesis = random_block(0, None, None, Some(0)); let block1 = random_block(1, Some(genesis.hash), None, Some(0)); insert_blocks(env.db.as_ref(), [&genesis, &block1].into_iter()); + env.db.update(|tx| FINISH.save_progress(tx, block1.number)).unwrap().unwrap(); let mut engine_rx = spawn_consensus_engine(consensus_engine);