diff --git a/crates/consensus/beacon/src/engine/handle.rs b/crates/consensus/beacon/src/engine/handle.rs new file mode 100644 index 0000000000..d2b8661b60 --- /dev/null +++ b/crates/consensus/beacon/src/engine/handle.rs @@ -0,0 +1,87 @@ +//! `BeaconConsensusEngine` external API + +use crate::{ + engine::message::OnForkChoiceUpdated, BeaconConsensusEngineEvent, BeaconEngineMessage, + BeaconForkChoiceUpdateError, BeaconOnNewPayloadError, +}; +use futures::TryFutureExt; +use reth_rpc_types::engine::{ + ExecutionPayload, ForkchoiceState, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, +}; +use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot}; +use tokio_stream::wrappers::UnboundedReceiverStream; + +/// A _shareable_ beacon consensus frontend type. Used to interact with the spawned beacon consensus +/// engine task. +/// +/// See also [`BeaconConsensusEngine`](crate::engine::BeaconConsensusEngine). +#[derive(Clone, Debug)] +pub struct BeaconConsensusEngineHandle { + pub(crate) to_engine: UnboundedSender, +} + +// === impl BeaconConsensusEngineHandle === + +impl BeaconConsensusEngineHandle { + /// Creates a new beacon consensus engine handle. + pub fn new(to_engine: UnboundedSender) -> Self { + Self { to_engine } + } + + /// Sends a new payload message to the beacon consensus engine and waits for a response. + /// + /// See also + pub async fn new_payload( + &self, + payload: ExecutionPayload, + ) -> Result { + let (tx, rx) = oneshot::channel(); + let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, tx }); + rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)? + } + + /// Sends a forkchoice update message to the beacon consensus engine and waits for a response. + /// + /// See also + pub async fn fork_choice_updated( + &self, + state: ForkchoiceState, + payload_attrs: Option, + ) -> Result { + Ok(self + .send_fork_choice_updated(state, payload_attrs) + .map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable) + .await?? + .await?) + } + + /// Sends a forkchoice update message to the beacon consensus engine and returns the receiver to + /// wait for a response. + fn send_fork_choice_updated( + &self, + state: ForkchoiceState, + payload_attrs: Option, + ) -> oneshot::Receiver> { + let (tx, rx) = oneshot::channel(); + let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated { + state, + payload_attrs, + tx, + }); + rx + } + + /// Sends a transition configuration exchagne message to the beacon consensus engine. + /// + /// See also + pub async fn transition_configuration_exchanged(&self) { + let _ = self.to_engine.send(BeaconEngineMessage::TransitionConfigurationExchanged); + } + + /// Creates a new [`BeaconConsensusEngineEvent`] listener stream. + pub fn event_listener(&self) -> UnboundedReceiverStream { + let (tx, rx) = mpsc::unbounded_channel(); + let _ = self.to_engine.send(BeaconEngineMessage::EventListener(tx)); + UnboundedReceiverStream::new(rx) + } +} diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index d397fcf378..175f6c5dca 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -1,13 +1,17 @@ use crate::{ - engine::{message::OnForkChoiceUpdated, metrics::EngineMetrics}, + engine::{ + forkchoice::{ForkchoiceStateHash, ForkchoiceStateTracker}, + message::OnForkChoiceUpdated, + metrics::EngineMetrics, + }, sync::{EngineSyncController, EngineSyncEvent}, }; -use futures::{Future, StreamExt, TryFutureExt}; +use futures::{Future, StreamExt}; use reth_db::database::Database; use reth_interfaces::{ blockchain_tree::{ error::{InsertBlockError, InsertBlockErrorKind}, - BlockStatus, BlockchainTreeEngine, + BlockStatus, BlockchainTreeEngine, InsertPayloadOk, }, consensus::ForkchoiceState, executor::{BlockExecutionError, BlockValidationError}, @@ -17,15 +21,14 @@ use reth_interfaces::{ }; use reth_payload_builder::{PayloadBuilderAttributes, PayloadBuilderHandle}; use reth_primitives::{ - listener::EventListeners, stage::StageId, BlockNumHash, BlockNumber, Head, Header, SealedBlock, - SealedHeader, H256, U256, + constants::EPOCH_SLOTS, listener::EventListeners, stage::StageId, BlockNumHash, BlockNumber, + Head, Header, SealedBlock, SealedHeader, H256, U256, }; use reth_provider::{ BlockReader, BlockSource, CanonChainTracker, ProviderError, StageCheckpointReader, }; use reth_rpc_types::engine::{ - ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, PayloadStatusEnum, - PayloadValidationError, + ExecutionPayload, PayloadAttributes, PayloadStatus, PayloadStatusEnum, PayloadValidationError, }; use reth_stages::{ControlFlow, Pipeline, PipelineError}; use reth_tasks::TaskSpawner; @@ -53,16 +56,15 @@ pub use error::{ mod invalid_headers; use invalid_headers::InvalidHeaderCache; -mod metrics; mod event; +pub use event::BeaconConsensusEngineEvent; mod forkchoice; +mod metrics; pub(crate) mod sync; -use crate::engine::forkchoice::{ForkchoiceStateHash, ForkchoiceStateTracker}; -pub use event::BeaconConsensusEngineEvent; -use reth_interfaces::blockchain_tree::InsertPayloadOk; -use reth_primitives::constants::EPOCH_SLOTS; +mod handle; +pub use handle::BeaconConsensusEngineHandle; /// The maximum number of invalid headers that can be tracked by the engine. const MAX_INVALID_HEADERS: u32 = 512u32; @@ -74,81 +76,6 @@ const MAX_INVALID_HEADERS: u32 = 512u32; /// If the distance exceeds this threshold, the pipeline will be used for sync. pub const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS; -/// A _shareable_ beacon consensus frontend. Used to interact with the spawned beacon consensus -/// engine. -/// -/// See also [`BeaconConsensusEngine`]. -#[derive(Clone, Debug)] -pub struct BeaconConsensusEngineHandle { - to_engine: UnboundedSender, -} - -// === impl BeaconConsensusEngineHandle === - -impl BeaconConsensusEngineHandle { - /// Creates a new beacon consensus engine handle. - pub fn new(to_engine: UnboundedSender) -> Self { - Self { to_engine } - } - - /// Sends a new payload message to the beacon consensus engine and waits for a response. - /// - /// See also - pub async fn new_payload( - &self, - payload: ExecutionPayload, - ) -> Result { - let (tx, rx) = oneshot::channel(); - let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, tx }); - rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)? - } - - /// Sends a forkchoice update message to the beacon consensus engine and waits for a response. - /// - /// See also - pub async fn fork_choice_updated( - &self, - state: ForkchoiceState, - payload_attrs: Option, - ) -> Result { - Ok(self - .send_fork_choice_updated(state, payload_attrs) - .map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable) - .await?? - .await?) - } - - /// Sends a forkchoice update message to the beacon consensus engine and returns the receiver to - /// wait for a response. - fn send_fork_choice_updated( - &self, - state: ForkchoiceState, - payload_attrs: Option, - ) -> oneshot::Receiver> { - let (tx, rx) = oneshot::channel(); - let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated { - state, - payload_attrs, - tx, - }); - rx - } - - /// Sends a transition configuration exchagne message to the beacon consensus engine. - /// - /// See also - pub async fn transition_configuration_exchanged(&self) { - let _ = self.to_engine.send(BeaconEngineMessage::TransitionConfigurationExchanged); - } - - /// Creates a new [`BeaconConsensusEngineEvent`] listener stream. - pub fn event_listener(&self) -> UnboundedReceiverStream { - let (tx, rx) = mpsc::unbounded_channel(); - let _ = self.to_engine.send(BeaconEngineMessage::EventListener(tx)); - UnboundedReceiverStream::new(rx) - } -} - /// The beacon consensus engine is the driver that switches between historical and live sync. /// /// The beacon consensus engine is itself driven by messages from the Consensus Layer, which are @@ -1493,7 +1420,7 @@ where #[cfg(test)] mod tests { use super::*; - use crate::engine::error::BeaconForkChoiceUpdateError; + use crate::{BeaconForkChoiceUpdateError, BeaconOnNewPayloadError}; use assert_matches::assert_matches; use reth_blockchain_tree::{ config::BlockchainTreeConfig, externals::TreeExternals, post_state::PostState, @@ -1510,6 +1437,9 @@ mod tests { providers::BlockchainProvider, test_utils::TestExecutorFactory, BlockWriter, ProviderFactory, }; + use reth_rpc_types::engine::{ + ExecutionPayload, ForkchoiceState, ForkchoiceUpdated, PayloadStatus, + }; use reth_stages::{test_utils::TestStages, ExecOutput, PipelineError, StageError}; use reth_tasks::TokioTaskExecutor; use std::{collections::VecDeque, sync::Arc, time::Duration};