From a73e6c8ca8ac55baced3e2a5c7f8f8f65940a930 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 15 May 2023 14:27:22 +0200 Subject: [PATCH] chore: cleanup engine api errors (#2674) --- crates/consensus/beacon/src/engine/error.rs | 52 +++++++++---------- crates/consensus/beacon/src/engine/message.rs | 4 +- crates/consensus/beacon/src/engine/mod.rs | 41 +++++++++------ crates/rpc/rpc-engine-api/src/engine_api.rs | 22 ++++---- crates/rpc/rpc-engine-api/src/error.rs | 49 +++++++---------- .../rpc-types/src/eth/engine/forkchoice.rs | 15 +++++- .../rpc/rpc-types/src/eth/engine/payload.rs | 7 ++- 7 files changed, 101 insertions(+), 89 deletions(-) diff --git a/crates/consensus/beacon/src/engine/error.rs b/crates/consensus/beacon/src/engine/error.rs index 58b7a22668..d382f805d9 100644 --- a/crates/consensus/beacon/src/engine/error.rs +++ b/crates/consensus/beacon/src/engine/error.rs @@ -1,61 +1,61 @@ -use reth_payload_builder::error::PayloadBuilderError; -use reth_rpc_types::engine::{EngineRpcError, ForkchoiceUpdateError, PayloadError}; +use reth_rpc_types::engine::ForkchoiceUpdateError; use reth_stages::PipelineError; -use thiserror::Error; /// Beacon engine result. -pub type BeaconEngineResult = Result; +pub type BeaconEngineResult = Result; -/// The error wrapper for the beacon consensus engine. -#[derive(Error, Debug)] -pub enum BeaconEngineError { - /// Forkchoice zero hash head received. - #[error("Received zero hash as forkchoice head")] - ForkchoiceEmptyHead, +/// The error type for the beacon consensus engine service +/// [BeaconConsensusEngine](crate::BeaconConsensusEngine) +/// +/// Represents all possible error cases for the beacon consensus engine. +#[derive(Debug, thiserror::Error)] +pub enum BeaconConsensusEngineError { /// Pipeline channel closed. #[error("Pipeline channel closed")] PipelineChannelClosed, - /// An error covered by the engine API standard error codes. - #[error(transparent)] - EngineApi(#[from] EngineRpcError), - /// Encountered a payload error. - #[error(transparent)] - Payload(#[from] PayloadError), - /// Encountered an error during the payload building process. - #[error(transparent)] - PayloadBuilderError(#[from] PayloadBuilderError), /// Pipeline error. #[error(transparent)] Pipeline(#[from] Box), /// Common error. Wrapper around [reth_interfaces::Error]. #[error(transparent)] Common(#[from] reth_interfaces::Error), - /// Thrown when the engine task stopped - #[error("beacon consensus engine task stopped")] - EngineUnavailable, } // box the pipeline error as it is a large enum. -impl From for BeaconEngineError { +impl From for BeaconConsensusEngineError { fn from(e: PipelineError) -> Self { Self::Pipeline(Box::new(e)) } } // for convenience in the beacon engine -impl From for BeaconEngineError { +impl From for BeaconConsensusEngineError { fn from(e: reth_interfaces::db::Error) -> Self { Self::Common(e.into()) } } /// Represents error cases for an applied forkchoice update. -#[derive(Error, Debug, Eq, PartialEq)] +/// +/// This represents all possible error cases, that must be returned as JSON RCP errors back to the +/// beacon node. +#[derive(Debug, Clone, Copy, Eq, PartialEq, thiserror::Error)] pub enum BeaconForkChoiceUpdateError { /// Thrown when a forkchoice update resulted in an error. #[error("Forkchoice update error: {0}")] ForkchoiceUpdateError(#[from] ForkchoiceUpdateError), - /// Thrown when the engine task stopped + /// Thrown when the engine task is unavailable/stopped. + #[error("beacon consensus engine task stopped")] + EngineUnavailable, +} + +/// Represents all error cases when handling a new payload. +/// +/// This represents all possible error cases that must be returned as JSON RCP errors back to the +/// beacon node. +#[derive(Debug, Clone, Eq, PartialEq, thiserror::Error)] +pub enum BeaconOnNewPayloadError { + /// Thrown when the engine task is unavailable/stopped. #[error("beacon consensus engine task stopped")] EngineUnavailable, } diff --git a/crates/consensus/beacon/src/engine/message.rs b/crates/consensus/beacon/src/engine/message.rs index fabc7c789e..e8f0c16e23 100644 --- a/crates/consensus/beacon/src/engine/message.rs +++ b/crates/consensus/beacon/src/engine/message.rs @@ -1,4 +1,4 @@ -use crate::{BeaconConsensusEngineEvent, BeaconEngineResult}; +use crate::{engine::error::BeaconOnNewPayloadError, BeaconConsensusEngineEvent}; use futures::{future::Either, FutureExt}; use reth_interfaces::consensus::ForkchoiceState; use reth_payload_builder::error::PayloadBuilderError; @@ -120,7 +120,7 @@ pub enum BeaconEngineMessage { /// The execution payload received by Engine API. payload: ExecutionPayload, /// The sender for returning payload status result. - tx: oneshot::Sender>, + tx: oneshot::Sender>, }, /// Message with updated forkchoice state. ForkchoiceUpdated { diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 9f4d0b5493..135cc80865 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -39,7 +39,10 @@ mod message; pub use message::BeaconEngineMessage; mod error; -pub use error::{BeaconEngineError, BeaconEngineResult, BeaconForkChoiceUpdateError}; +pub use error::{ + BeaconConsensusEngineError, BeaconEngineResult, BeaconForkChoiceUpdateError, + BeaconOnNewPayloadError, +}; mod metrics; @@ -74,10 +77,10 @@ impl BeaconConsensusEngineHandle { pub async fn new_payload( &self, payload: ExecutionPayload, - ) -> BeaconEngineResult { + ) -> Result { let (tx, rx) = oneshot::channel(); let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, tx }); - rx.await.map_err(|_| BeaconEngineError::EngineUnavailable)? + rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)? } /// Sends a forkchoice update message to the beacon consensus engine and waits for a response. @@ -313,7 +316,7 @@ where &mut self, state: ForkchoiceState, attrs: Option, - ) -> Result { + ) -> Result { trace!(target: "consensus::engine", ?state, "Received new forkchoice state update"); if state.head_block_hash.is_zero() { return Ok(OnForkChoiceUpdated::invalid_state()) @@ -370,7 +373,7 @@ where if let Error::Execution(ref err) = error { if err.is_fatal() { tracing::error!(target: "consensus::engine", ?err, "Encountered fatal error"); - return Err(BeaconEngineError::Common(error)) + return Err(BeaconConsensusEngineError::Common(error)) } } @@ -387,9 +390,13 @@ where Ok(OnForkChoiceUpdated::valid(status)) } - /// Sets the state of the canon chain tracker based on the given forkchoice update. This should - /// be called before issuing a VALID forkchoice update. - fn update_canon_chain(&self, update: &ForkchoiceState) -> Result<(), BeaconEngineError> { + /// Sets the state of the canon chain tracker based on the given forkchoice update. + /// + /// This should be called before issuing a VALID forkchoice update. + fn update_canon_chain( + &self, + update: &ForkchoiceState, + ) -> Result<(), BeaconConsensusEngineError> { if !update.finalized_block_hash.is_zero() { let finalized = self .blockchain @@ -699,7 +706,7 @@ where &mut self, ev: EngineSyncEvent, current_state: &ForkchoiceState, - ) -> Option> { + ) -> Option> { match ev { EngineSyncEvent::FetchedFullBlock(block) => { // it is guaranteed that the pipeline is not active at this point. @@ -720,7 +727,7 @@ where } EngineSyncEvent::PipelineTaskDropped => { error!(target: "consensus::engine", "Failed to receive spawned pipeline"); - return Some(Err(BeaconEngineError::PipelineChannelClosed)) + return Some(Err(BeaconConsensusEngineError::PipelineChannelClosed)) } EngineSyncEvent::PipelineFinished { result, reached_max_block } => { match result { @@ -762,7 +769,7 @@ where Client: HeadersClient + BodiesClient + Clone + Unpin + 'static, BT: BlockchainTreeEngine + BlockProvider + CanonChainTracker + Unpin + 'static, { - type Output = Result<(), BeaconEngineError>; + type Output = Result<(), BeaconConsensusEngineError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); @@ -896,7 +903,7 @@ mod tests { async fn send_new_payload( &self, payload: ExecutionPayload, - ) -> BeaconEngineResult { + ) -> Result { self.engine_handle.new_payload(payload).await } @@ -905,7 +912,7 @@ mod tests { async fn send_new_payload_retry_on_syncing( &self, payload: ExecutionPayload, - ) -> BeaconEngineResult { + ) -> Result { loop { let result = self.send_new_payload(payload.clone()).await?; if !result.is_syncing() { @@ -984,7 +991,7 @@ mod tests { fn spawn_consensus_engine( engine: TestBeaconConsensusEngine, - ) -> oneshot::Receiver> { + ) -> oneshot::Receiver> { let (tx, rx) = oneshot::channel(); tokio::spawn(async move { let result = engine.await; @@ -1018,7 +1025,7 @@ mod tests { .await; assert_matches!( res.await, - Ok(Err(BeaconEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed)) + Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed)) ); } @@ -1056,7 +1063,7 @@ mod tests { .await; assert_matches!( rx.await, - Ok(Err(BeaconEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed)) + Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed)) ); } @@ -1091,7 +1098,7 @@ mod tests { assert_matches!( rx.await, - Ok(Err(BeaconEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed)) + Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed)) ); } diff --git a/crates/rpc/rpc-engine-api/src/engine_api.rs b/crates/rpc/rpc-engine-api/src/engine_api.rs index 0e4617fb9a..2117baba75 100644 --- a/crates/rpc/rpc-engine-api/src/engine_api.rs +++ b/crates/rpc/rpc-engine-api/src/engine_api.rs @@ -1,6 +1,6 @@ use crate::{EngineApiError, EngineApiMessageVersion, EngineApiResult}; use async_trait::async_trait; -use jsonrpsee_core::RpcResult as Result; +use jsonrpsee_core::RpcResult; use reth_beacon_consensus::BeaconConsensusEngineHandle; use reth_interfaces::consensus::ForkchoiceState; use reth_payload_builder::PayloadStore; @@ -296,14 +296,14 @@ where /// Handler for `engine_newPayloadV1` /// See also /// Caution: This should not accept the `withdrawals` field - async fn new_payload_v1(&self, payload: ExecutionPayload) -> Result { + async fn new_payload_v1(&self, payload: ExecutionPayload) -> RpcResult { trace!(target: "rpc::eth", "Serving engine_newPayloadV1"); Ok(EngineApi::new_payload_v1(self, payload).await?) } /// Handler for `engine_newPayloadV1` /// See also - async fn new_payload_v2(&self, payload: ExecutionPayload) -> Result { + async fn new_payload_v2(&self, payload: ExecutionPayload) -> RpcResult { trace!(target: "rpc::eth", "Serving engine_newPayloadV1"); Ok(EngineApi::new_payload_v2(self, payload).await?) } @@ -316,7 +316,7 @@ where &self, fork_choice_state: ForkchoiceState, payload_attributes: Option, - ) -> Result { + ) -> RpcResult { trace!(target: "rpc::eth", "Serving engine_forkchoiceUpdatedV1"); Ok(EngineApi::fork_choice_updated_v1(self, fork_choice_state, payload_attributes).await?) } @@ -327,7 +327,7 @@ where &self, fork_choice_state: ForkchoiceState, payload_attributes: Option, - ) -> Result { + ) -> RpcResult { trace!(target: "rpc::eth", "Serving engine_forkchoiceUpdatedV2"); Ok(EngineApi::fork_choice_updated_v2(self, fork_choice_state, payload_attributes).await?) } @@ -343,7 +343,7 @@ where /// /// Note: /// > Client software MAY stop the corresponding build process after serving this call. - async fn get_payload_v1(&self, payload_id: PayloadId) -> Result { + async fn get_payload_v1(&self, payload_id: PayloadId) -> RpcResult { trace!(target: "rpc::eth", "Serving engine_getPayloadV1"); Ok(EngineApi::get_payload_v1(self, payload_id).await?) } @@ -357,7 +357,7 @@ where /// /// Note: /// > Client software MAY stop the corresponding build process after serving this call. - async fn get_payload_v2(&self, payload_id: PayloadId) -> Result { + async fn get_payload_v2(&self, payload_id: PayloadId) -> RpcResult { trace!(target: "rpc::eth", "Serving engine_getPayloadV2"); Ok(EngineApi::get_payload_v2(self, payload_id).await?) } @@ -367,7 +367,7 @@ where async fn get_payload_bodies_by_hash_v1( &self, block_hashes: Vec, - ) -> Result { + ) -> RpcResult { trace!(target: "rpc::eth", "Serving engine_getPayloadBodiesByHashV1"); Ok(EngineApi::get_payload_bodies_by_hash(self, block_hashes)?) } @@ -378,7 +378,7 @@ where &self, start: U64, count: U64, - ) -> Result { + ) -> RpcResult { trace!(target: "rpc::eth", "Serving engine_getPayloadBodiesByHashV1"); Ok(EngineApi::get_payload_bodies_by_range(self, start.as_u64(), count.as_u64())?) } @@ -388,14 +388,14 @@ where async fn exchange_transition_configuration( &self, config: TransitionConfiguration, - ) -> Result { + ) -> RpcResult { trace!(target: "rpc::eth", "Serving engine_getPayloadBodiesByHashV1"); Ok(EngineApi::exchange_transition_configuration(self, config)?) } /// Handler for `engine_exchangeCapabilitiesV1` /// See also - async fn exchange_capabilities(&self, _capabilities: Vec) -> Result> { + async fn exchange_capabilities(&self, _capabilities: Vec) -> RpcResult> { Ok(CAPABILITIES.into_iter().map(str::to_owned).collect()) } } diff --git a/crates/rpc/rpc-engine-api/src/error.rs b/crates/rpc/rpc-engine-api/src/error.rs index 9be67f0f4f..03c55c1a0e 100644 --- a/crates/rpc/rpc-engine-api/src/error.rs +++ b/crates/rpc/rpc-engine-api/src/error.rs @@ -1,9 +1,8 @@ use jsonrpsee_types::error::{INTERNAL_ERROR_CODE, INVALID_PARAMS_CODE}; -use reth_beacon_consensus::{BeaconEngineError, BeaconForkChoiceUpdateError}; +use reth_beacon_consensus::{BeaconForkChoiceUpdateError, BeaconOnNewPayloadError}; use reth_payload_builder::error::PayloadBuilderError; use reth_primitives::{H256, U256}; use thiserror::Error; -use tokio::sync::{mpsc, oneshot}; /// The Engine API result type pub type EngineApiResult = Result; @@ -15,7 +14,7 @@ pub const REQUEST_TOO_LARGE_CODE: i32 = -38004; /// Error returned by [`EngineApi`][crate::EngineApi] /// -/// Note: This is a high fidelity error type which can be converted to an RPC error that adheres to the spec: +/// Note: This is a high-fidelity error type which can be converted to an RPC error that adheres to the spec: #[derive(Error, Debug)] pub enum EngineApiError { /// Unknown payload requested. @@ -64,35 +63,20 @@ pub enum EngineApiError { /// Consensus terminal block hash. consensus: H256, }, - /// Beacon consensus engine error. - #[error(transparent)] - ConsensusEngine(#[from] BeaconEngineError), - /// An error occurred while processing the fork choice update. + /// An error occurred while processing the fork choice update in the beacon consensus engine. #[error(transparent)] ForkChoiceUpdate(#[from] BeaconForkChoiceUpdateError), + /// An error occurred while processing a new payload in the beacon consensus engine. + #[error(transparent)] + NewPayload(#[from] BeaconOnNewPayloadError), /// Encountered an internal error. #[error(transparent)] Internal(Box), - /// Failed to send message due ot closed channel - #[error("Closed channel")] - ChannelClosed, /// Fetching the payload failed #[error(transparent)] GetPayloadError(#[from] PayloadBuilderError), } -impl From> for EngineApiError { - fn from(_: mpsc::error::SendError) -> Self { - EngineApiError::ChannelClosed - } -} - -impl From for EngineApiError { - fn from(_: oneshot::error::RecvError) -> Self { - EngineApiError::ChannelClosed - } -} - impl From for jsonrpsee_types::error::ErrorObject<'static> { fn from(error: EngineApiError) -> Self { let code = match error { @@ -102,15 +86,22 @@ impl From for jsonrpsee_types::error::ErrorObject<'static> { EngineApiError::HasWithdrawalsPreShanghai => INVALID_PARAMS_CODE, EngineApiError::UnknownPayload => UNKNOWN_PAYLOAD_CODE, EngineApiError::PayloadRequestTooLarge { .. } => REQUEST_TOO_LARGE_CODE, + + // Error responses from the consensus engine + EngineApiError::ForkChoiceUpdate(err) => match err { + BeaconForkChoiceUpdateError::ForkchoiceUpdateError(err) => return err.into(), + BeaconForkChoiceUpdateError::EngineUnavailable => INTERNAL_ERROR_CODE, + }, + EngineApiError::NewPayload(ref err) => match err { + BeaconOnNewPayloadError::EngineUnavailable => INTERNAL_ERROR_CODE, + }, // Any other server error - _ => INTERNAL_ERROR_CODE, + EngineApiError::TerminalTD { .. } | + EngineApiError::TerminalBlockHash { .. } | + EngineApiError::Internal(_) | + EngineApiError::GetPayloadError(_) => INTERNAL_ERROR_CODE, }; - jsonrpsee_types::error::ErrorObject::owned( - code, - // TODO properly convert to rpc error - error.to_string(), - None::<()>, - ) + jsonrpsee_types::error::ErrorObject::owned(code, error.to_string(), None::<()>) } } diff --git a/crates/rpc/rpc-types/src/eth/engine/forkchoice.rs b/crates/rpc/rpc-types/src/eth/engine/forkchoice.rs index bf107cd839..ea215df5c1 100644 --- a/crates/rpc/rpc-types/src/eth/engine/forkchoice.rs +++ b/crates/rpc/rpc-types/src/eth/engine/forkchoice.rs @@ -30,8 +30,11 @@ pub struct ForkchoiceState { pub finalized_block_hash: H256, } -/// A standalone forkchoice update result for RPC. -#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)] +/// A standalone forkchoice update errors for RPC. +/// +/// These are considered hard RPC errors and are _not_ returned as [PayloadStatus] or +/// [PayloadStatusEnum::Invalid]. +#[derive(Clone, Copy, Debug, PartialEq, Eq, thiserror::Error)] pub enum ForkchoiceUpdateError { /// The forkchoice update has been processed, but the requested contained invalid /// [PayloadAttributes](crate::engine::PayloadAttributes). @@ -42,6 +45,9 @@ pub enum ForkchoiceUpdateError { /// The given [ForkchoiceState] is invalid or inconsistent. #[error("Invalid forkchoice state")] InvalidState, + /// Thrown when a forkchoice final block does not exist in the database. + #[error("final block not available in database")] + UnknownFinalBlock, } impl From for jsonrpsee_types::error::ErrorObject<'static> { @@ -59,6 +65,11 @@ impl From for jsonrpsee_types::error::ErrorObject<'static INVALID_FORK_CHOICE_STATE_ERROR_MSG, None::<()>, ), + ForkchoiceUpdateError::UnknownFinalBlock => jsonrpsee_types::error::ErrorObject::owned( + INVALID_FORK_CHOICE_STATE_ERROR, + INVALID_FORK_CHOICE_STATE_ERROR_MSG, + None::<()>, + ), } } } diff --git a/crates/rpc/rpc-types/src/eth/engine/payload.rs b/crates/rpc/rpc-types/src/eth/engine/payload.rs index bda7526e90..f57b65c444 100644 --- a/crates/rpc/rpc-types/src/eth/engine/payload.rs +++ b/crates/rpc/rpc-types/src/eth/engine/payload.rs @@ -187,6 +187,7 @@ impl TryFrom for SealedBlock { } } +/// Error that can occur when handling payloads. #[derive(thiserror::Error, Debug)] pub enum PayloadError { /// Invalid payload extra data. @@ -252,7 +253,7 @@ pub struct PayloadAttributes { pub withdrawals: Option>, } -/// This structure contains the result of processing a payload +/// This structure contains the result of processing a payload or fork choice update. #[derive(Clone, Debug, PartialEq, Eq, Deserialize)] #[serde(rename_all = "camelCase")] pub struct PayloadStatus { @@ -393,7 +394,9 @@ impl PayloadStatusEnum { } } -/// Various validation errors +/// Various errors that can occur when validating a payload or forkchoice update. +/// +/// This is intended for the [PayloadStatusEnum::Invalid] variant. #[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] pub enum PayloadValidationError { /// Thrown when a forkchoice update's head links to a previously rejected payload.