diff --git a/crates/consensus/beacon/src/engine/error.rs b/crates/consensus/beacon/src/engine/error.rs index 0eef90ea7e..6fabfbf031 100644 --- a/crates/consensus/beacon/src/engine/error.rs +++ b/crates/consensus/beacon/src/engine/error.rs @@ -1,5 +1,5 @@ use crate::engine::hooks::EngineHookError; -use alloy_rpc_types_engine::ForkchoiceUpdateError; +pub use reth_engine_primitives::BeaconForkChoiceUpdateError; use reth_errors::{DatabaseError, RethError}; use reth_stages_api::PipelineError; @@ -42,38 +42,3 @@ impl From for BeaconConsensusEngineError { Self::Common(e.into()) } } - -/// Represents error cases for an applied forkchoice update. -/// -/// This represents all possible error cases, that must be returned as JSON RPC errors back to the -/// beacon node. -#[derive(Debug, thiserror::Error)] -pub enum BeaconForkChoiceUpdateError { - /// Thrown when a forkchoice update resulted in an error. - #[error("forkchoice update error: {0}")] - ForkchoiceUpdateError(#[from] ForkchoiceUpdateError), - /// Thrown when the engine task is unavailable/stopped. - #[error("beacon consensus engine task stopped")] - EngineUnavailable, - /// An internal error occurred, not necessarily related to the update. - #[error(transparent)] - Internal(Box), -} - -impl BeaconForkChoiceUpdateError { - /// Create a new internal error. - pub fn internal(e: E) -> Self { - Self::Internal(Box::new(e)) - } -} - -impl From for BeaconForkChoiceUpdateError { - fn from(e: RethError) -> Self { - Self::internal(e) - } -} -impl From for BeaconForkChoiceUpdateError { - fn from(e: DatabaseError) -> Self { - Self::internal(e) - } -} diff --git a/crates/consensus/beacon/src/engine/handle.rs b/crates/consensus/beacon/src/engine/handle.rs index 7d6dd3cff3..e4f291c064 100644 --- a/crates/consensus/beacon/src/engine/handle.rs +++ b/crates/consensus/beacon/src/engine/handle.rs @@ -1,94 +1,3 @@ //! `BeaconConsensusEngine` external API -use crate::BeaconForkChoiceUpdateError; -use alloy_rpc_types_engine::{ - ExecutionPayload, ExecutionPayloadSidecar, ForkchoiceState, ForkchoiceUpdated, PayloadStatus, -}; -use futures::TryFutureExt; -use reth_engine_primitives::{ - BeaconEngineMessage, BeaconOnNewPayloadError, EngineApiMessageVersion, EngineTypes, - OnForkChoiceUpdated, -}; -use reth_errors::RethResult; -use tokio::sync::{mpsc::UnboundedSender, oneshot}; - -/// A _shareable_ beacon consensus frontend type. Used to interact with the spawned beacon consensus -/// engine task. -/// -/// See also `BeaconConsensusEngine` -#[derive(Debug, Clone)] -pub struct BeaconConsensusEngineHandle -where - Engine: EngineTypes, -{ - pub(crate) to_engine: UnboundedSender>, -} - -// === impl BeaconConsensusEngineHandle === - -impl BeaconConsensusEngineHandle -where - Engine: EngineTypes, -{ - /// Creates a new beacon consensus engine handle. - pub const fn new(to_engine: UnboundedSender>) -> Self { - Self { to_engine } - } - - /// Sends a new payload message to the beacon consensus engine and waits for a response. - /// - /// See also - pub async fn new_payload( - &self, - payload: ExecutionPayload, - sidecar: ExecutionPayloadSidecar, - ) -> Result { - let (tx, rx) = oneshot::channel(); - let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, sidecar, tx }); - rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)? - } - - /// Sends a forkchoice update message to the beacon consensus engine and waits for a response. - /// - /// See also - pub async fn fork_choice_updated( - &self, - state: ForkchoiceState, - payload_attrs: Option, - version: EngineApiMessageVersion, - ) -> Result { - Ok(self - .send_fork_choice_updated(state, payload_attrs, version) - .map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable) - .await?? - .await?) - } - - /// Sends a forkchoice update message to the beacon consensus engine and returns the receiver to - /// wait for a response. - fn send_fork_choice_updated( - &self, - state: ForkchoiceState, - payload_attrs: Option, - version: EngineApiMessageVersion, - ) -> oneshot::Receiver> { - let (tx, rx) = oneshot::channel(); - let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated { - state, - payload_attrs, - tx, - version, - }); - rx - } - - /// Sends a transition configuration exchange message to the beacon consensus engine. - /// - /// See also - /// - /// This only notifies about the exchange. The actual exchange is done by the engine API impl - /// itself. - pub fn transition_configuration_exchanged(&self) { - let _ = self.to_engine.send(BeaconEngineMessage::TransitionConfigurationExchanged); - } -} +pub use reth_engine_primitives::BeaconConsensusEngineHandle; diff --git a/crates/engine/primitives/src/error.rs b/crates/engine/primitives/src/error.rs index b7deb607bc..18e72fe83e 100644 --- a/crates/engine/primitives/src/error.rs +++ b/crates/engine/primitives/src/error.rs @@ -1,3 +1,5 @@ +use alloy_rpc_types_engine::ForkchoiceUpdateError; + /// Represents all error cases when handling a new payload. /// /// This represents all possible error cases that must be returned as JSON RCP errors back to the @@ -18,3 +20,27 @@ impl BeaconOnNewPayloadError { Self::Internal(Box::new(e)) } } + +/// Represents error cases for an applied forkchoice update. +/// +/// This represents all possible error cases, that must be returned as JSON RPC errors back to the +/// beacon node. +#[derive(Debug, thiserror::Error)] +pub enum BeaconForkChoiceUpdateError { + /// Thrown when a forkchoice update resulted in an error. + #[error("forkchoice update error: {0}")] + ForkchoiceUpdateError(#[from] ForkchoiceUpdateError), + /// Thrown when the engine task is unavailable/stopped. + #[error("beacon consensus engine task stopped")] + EngineUnavailable, + /// An internal error occurred, not necessarily related to the update. + #[error(transparent)] + Internal(Box), +} + +impl BeaconForkChoiceUpdateError { + /// Create a new internal error. + pub fn internal(e: E) -> Self { + Self::Internal(Box::new(e)) + } +} diff --git a/crates/engine/primitives/src/lib.rs b/crates/engine/primitives/src/lib.rs index 9921023c4a..f7877257c1 100644 --- a/crates/engine/primitives/src/lib.rs +++ b/crates/engine/primitives/src/lib.rs @@ -14,13 +14,13 @@ use core::fmt; use alloy_consensus::BlockHeader; use alloy_rpc_types_engine::{ExecutionPayload, ExecutionPayloadSidecar, PayloadError}; -pub use error::BeaconOnNewPayloadError; +pub use error::*; mod forkchoice; pub use forkchoice::{ForkchoiceStateHash, ForkchoiceStateTracker, ForkchoiceStatus}; mod message; -pub use message::{BeaconEngineMessage, OnForkChoiceUpdated}; +pub use message::*; mod invalid_block_hook; pub use invalid_block_hook::InvalidBlockHook; diff --git a/crates/engine/primitives/src/message.rs b/crates/engine/primitives/src/message.rs index d8a4c1322a..6e4f462927 100644 --- a/crates/engine/primitives/src/message.rs +++ b/crates/engine/primitives/src/message.rs @@ -1,9 +1,12 @@ -use crate::{BeaconOnNewPayloadError, EngineApiMessageVersion, EngineTypes, ForkchoiceStatus}; +use crate::{ + error::BeaconForkChoiceUpdateError, BeaconOnNewPayloadError, EngineApiMessageVersion, + EngineTypes, ForkchoiceStatus, +}; use alloy_rpc_types_engine::{ ExecutionPayload, ExecutionPayloadSidecar, ForkChoiceUpdateResult, ForkchoiceState, ForkchoiceUpdateError, ForkchoiceUpdated, PayloadId, PayloadStatus, PayloadStatusEnum, }; -use futures::{future::Either, FutureExt}; +use futures::{future::Either, FutureExt, TryFutureExt}; use reth_errors::RethResult; use reth_payload_builder_primitives::PayloadBuilderError; use std::{ @@ -12,7 +15,7 @@ use std::{ pin::Pin, task::{ready, Context, Poll}, }; -use tokio::sync::oneshot; +use tokio::sync::{mpsc::UnboundedSender, oneshot}; /// Represents the outcome of forkchoice update. /// @@ -191,3 +194,82 @@ impl Display for BeaconEngineMessage { } } } + +/// A clonable sender type that can be used to send engine API messages. +/// +/// This type mirrors consensus related functions of the engine API. +#[derive(Debug, Clone)] +pub struct BeaconConsensusEngineHandle +where + Engine: EngineTypes, +{ + to_engine: UnboundedSender>, +} + +impl BeaconConsensusEngineHandle +where + Engine: EngineTypes, +{ + /// Creates a new beacon consensus engine handle. + pub const fn new(to_engine: UnboundedSender>) -> Self { + Self { to_engine } + } + + /// Sends a new payload message to the beacon consensus engine and waits for a response. + /// + /// See also + pub async fn new_payload( + &self, + payload: ExecutionPayload, + sidecar: ExecutionPayloadSidecar, + ) -> Result { + let (tx, rx) = oneshot::channel(); + let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, sidecar, tx }); + rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)? + } + + /// Sends a forkchoice update message to the beacon consensus engine and waits for a response. + /// + /// See also + pub async fn fork_choice_updated( + &self, + state: ForkchoiceState, + payload_attrs: Option, + version: EngineApiMessageVersion, + ) -> Result { + Ok(self + .send_fork_choice_updated(state, payload_attrs, version) + .map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable) + .await? + .map_err(BeaconForkChoiceUpdateError::internal)? + .await?) + } + + /// Sends a forkchoice update message to the beacon consensus engine and returns the receiver to + /// wait for a response. + fn send_fork_choice_updated( + &self, + state: ForkchoiceState, + payload_attrs: Option, + version: EngineApiMessageVersion, + ) -> oneshot::Receiver> { + let (tx, rx) = oneshot::channel(); + let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated { + state, + payload_attrs, + tx, + version, + }); + rx + } + + /// Sends a transition configuration exchange message to the beacon consensus engine. + /// + /// See also + /// + /// This only notifies about the exchange. The actual exchange is done by the engine API impl + /// itself. + pub fn transition_configuration_exchanged(&self) { + let _ = self.to_engine.send(BeaconEngineMessage::TransitionConfigurationExchanged); + } +}