diff --git a/crates/consensus/beacon/src/engine/error.rs b/crates/consensus/beacon/src/engine/error.rs index d382f805d9..89280cbf23 100644 --- a/crates/consensus/beacon/src/engine/error.rs +++ b/crates/consensus/beacon/src/engine/error.rs @@ -39,16 +39,30 @@ impl From for BeaconConsensusEngineError { /// /// This represents all possible error cases, that must be returned as JSON RCP errors back to the /// beacon node. -#[derive(Debug, Clone, Copy, Eq, PartialEq, thiserror::Error)] +#[derive(Debug, thiserror::Error)] pub enum BeaconForkChoiceUpdateError { /// Thrown when a forkchoice update resulted in an error. #[error("Forkchoice update error: {0}")] ForkchoiceUpdateError(#[from] ForkchoiceUpdateError), + /// Internal errors, for example, error while reading from the database. + #[error(transparent)] + Internal(Box), /// Thrown when the engine task is unavailable/stopped. #[error("beacon consensus engine task stopped")] EngineUnavailable, } +impl From for BeaconForkChoiceUpdateError { + fn from(e: reth_interfaces::Error) -> Self { + Self::Internal(Box::new(e)) + } +} +impl From for BeaconForkChoiceUpdateError { + fn from(e: reth_interfaces::db::Error) -> Self { + Self::Internal(Box::new(e.into())) + } +} + /// Represents all error cases when handling a new payload. /// /// This represents all possible error cases that must be returned as JSON RCP errors back to the diff --git a/crates/consensus/beacon/src/engine/message.rs b/crates/consensus/beacon/src/engine/message.rs index e8f0c16e23..fdbee0586c 100644 --- a/crates/consensus/beacon/src/engine/message.rs +++ b/crates/consensus/beacon/src/engine/message.rs @@ -19,6 +19,7 @@ use tokio::sync::{mpsc::UnboundedSender, oneshot}; #[must_use = "futures do nothing unless you `.await` or poll them"] #[derive(Debug)] pub struct OnForkChoiceUpdated { + /// Tracks if this update was valid. is_valid_update: bool, /// Returns the result of the forkchoice update. fut: Either, PendingPayloadId>, @@ -129,7 +130,7 @@ pub enum BeaconEngineMessage { /// The payload attributes for block building. payload_attrs: Option, /// The sender for returning forkchoice updated result. - tx: oneshot::Sender, + tx: oneshot::Sender>, }, /// Add a new listener for [`BeaconEngineMessage`]. EventListener(UnboundedSender), diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 135cc80865..a8b7d2c24d 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -94,7 +94,7 @@ impl BeaconConsensusEngineHandle { Ok(self .send_fork_choice_updated(state, payload_attrs) .map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable) - .await? + .await?? .await?) } @@ -104,7 +104,7 @@ impl BeaconConsensusEngineHandle { &self, state: ForkchoiceState, payload_attrs: Option, - ) -> oneshot::Receiver { + ) -> oneshot::Receiver> { let (tx, rx) = oneshot::channel(); let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated { state, @@ -307,16 +307,53 @@ where Some(status) } + /// Invoked when we receive a new forkchoice update message. + /// + /// Returns `true` if the engine now reached its maximum block number, See + /// [EngineSyncController::has_reached_max_block]. + fn on_forkchoice_updated( + &mut self, + state: ForkchoiceState, + attrs: Option, + tx: oneshot::Sender>, + ) -> bool { + self.metrics.forkchoice_updated_messages.increment(1); + + let on_updated = match self.forkchoice_updated(state, attrs) { + Ok(response) => response, + Err(error) => { + let _ = tx.send(Err(error)); + return false + } + }; + + let is_valid_response = on_updated.is_valid_update(); + let _ = tx.send(Ok(on_updated)); + + // Terminate the sync early if it's reached the maximum user + // configured block. + if is_valid_response { + let tip_number = self.blockchain.canonical_tip().number; + if self.sync.has_reached_max_block(tip_number) { + return true + } + } + + false + } + /// Called to resolve chain forks and ensure that the Execution layer is working with the latest /// valid chain. /// /// These responses should adhere to the [Engine API Spec for /// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1). - fn on_forkchoice_updated( + /// + /// Returns an error if an internal error occurred like a database error. + fn forkchoice_updated( &mut self, state: ForkchoiceState, attrs: Option, - ) -> Result { + ) -> Result { trace!(target: "consensus::engine", ?state, "Received new forkchoice state update"); if state.head_block_hash.is_zero() { return Ok(OnForkChoiceUpdated::invalid_state()) @@ -373,7 +410,7 @@ where if let Error::Execution(ref err) = error { if err.is_fatal() { tracing::error!(target: "consensus::engine", ?err, "Encountered fatal error"); - return Err(BeaconConsensusEngineError::Common(error)) + return Err(error) } } @@ -393,10 +430,7 @@ where /// Sets the state of the canon chain tracker based on the given forkchoice update. /// /// This should be called before issuing a VALID forkchoice update. - fn update_canon_chain( - &self, - update: &ForkchoiceState, - ) -> Result<(), BeaconConsensusEngineError> { + fn update_canon_chain(&self, update: &ForkchoiceState) -> Result<(), reth_interfaces::Error> { if !update.finalized_block_hash.is_zero() { let finalized = self .blockchain @@ -436,7 +470,7 @@ where /// /// If the newest head is not invalid, then this will trigger a new pipeline run to sync the gap /// - /// See [Self::on_forkchoice_updated] and [BlockchainTreeEngine::make_canonical]. + /// See [Self::forkchoice_updated] and [BlockchainTreeEngine::make_canonical]. fn on_failed_canonical_forkchoice_update( &mut self, state: &ForkchoiceState, @@ -778,24 +812,8 @@ where while let Poll::Ready(Some(msg)) = this.engine_message_rx.poll_next_unpin(cx) { match msg { BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => { - this.metrics.forkchoice_updated_messages.increment(1); - let on_updated = match this.on_forkchoice_updated(state, payload_attrs) { - Ok(response) => response, - Err(error) => { - error!(target: "consensus::engine", ?state, ?error, "Error getting forkchoice updated response"); - return Poll::Ready(Err(error)) - } - }; - let is_valid_response = on_updated.is_valid_update(); - let _ = tx.send(on_updated); - - // Terminate the sync early if it's reached the maximum user - // configured block. - if is_valid_response { - let tip_number = this.blockchain.canonical_tip().number; - if this.sync.has_reached_max_block(tip_number) { - return Poll::Ready(Ok(())) - } + if this.on_forkchoice_updated(state, payload_attrs, tx) { + return Poll::Ready(Ok(())) } } BeaconEngineMessage::NewPayload { payload, tx } => { @@ -1163,7 +1181,7 @@ mod tests { let mut engine_rx = spawn_consensus_engine(consensus_engine); let res = env.send_forkchoice_updated(ForkchoiceState::default()).await; - assert_eq!( + assert_matches!( res, Err(BeaconForkChoiceUpdateError::ForkchoiceUpdateError( ForkchoiceUpdateError::InvalidState diff --git a/crates/rpc/rpc-engine-api/src/error.rs b/crates/rpc/rpc-engine-api/src/error.rs index 03c55c1a0e..8a87b3f313 100644 --- a/crates/rpc/rpc-engine-api/src/error.rs +++ b/crates/rpc/rpc-engine-api/src/error.rs @@ -88,9 +88,10 @@ impl From for jsonrpsee_types::error::ErrorObject<'static> { EngineApiError::PayloadRequestTooLarge { .. } => REQUEST_TOO_LARGE_CODE, // Error responses from the consensus engine - EngineApiError::ForkChoiceUpdate(err) => match err { - BeaconForkChoiceUpdateError::ForkchoiceUpdateError(err) => return err.into(), + EngineApiError::ForkChoiceUpdate(ref err) => match err { + BeaconForkChoiceUpdateError::ForkchoiceUpdateError(err) => return (*err).into(), BeaconForkChoiceUpdateError::EngineUnavailable => INTERNAL_ERROR_CODE, + BeaconForkChoiceUpdateError::Internal(_) => INTERNAL_ERROR_CODE, }, EngineApiError::NewPayload(ref err) => match err { BeaconOnNewPayloadError::EngineUnavailable => INTERNAL_ERROR_CODE,