chore: move BeaconConsensusEngineHandle to separate file (#3633)

This commit is contained in:
Matthias Seitz
2023-07-06 17:41:14 +02:00
committed by GitHub
parent 77cd4abbdc
commit aef9480da0
2 changed files with 105 additions and 88 deletions

View File

@@ -0,0 +1,87 @@
//! `BeaconConsensusEngine` external API
use crate::{
engine::message::OnForkChoiceUpdated, BeaconConsensusEngineEvent, BeaconEngineMessage,
BeaconForkChoiceUpdateError, BeaconOnNewPayloadError,
};
use futures::TryFutureExt;
use reth_rpc_types::engine::{
ExecutionPayload, ForkchoiceState, ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
};
use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
/// A _shareable_ beacon consensus frontend type. Used to interact with the spawned beacon consensus
/// engine task.
///
/// See also [`BeaconConsensusEngine`](crate::engine::BeaconConsensusEngine).
#[derive(Clone, Debug)]
pub struct BeaconConsensusEngineHandle {
pub(crate) to_engine: UnboundedSender<BeaconEngineMessage>,
}
// === impl BeaconConsensusEngineHandle ===
impl BeaconConsensusEngineHandle {
/// Creates a new beacon consensus engine handle.
pub fn new(to_engine: UnboundedSender<BeaconEngineMessage>) -> Self {
Self { to_engine }
}
/// Sends a new payload message to the beacon consensus engine and waits for a response.
///
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_newpayloadv2>
pub async fn new_payload(
&self,
payload: ExecutionPayload,
) -> Result<PayloadStatus, BeaconOnNewPayloadError> {
let (tx, rx) = oneshot::channel();
let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, tx });
rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
}
/// Sends a forkchoice update message to the beacon consensus engine and waits for a response.
///
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
pub async fn fork_choice_updated(
&self,
state: ForkchoiceState,
payload_attrs: Option<PayloadAttributes>,
) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
Ok(self
.send_fork_choice_updated(state, payload_attrs)
.map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable)
.await??
.await?)
}
/// Sends a forkchoice update message to the beacon consensus engine and returns the receiver to
/// wait for a response.
fn send_fork_choice_updated(
&self,
state: ForkchoiceState,
payload_attrs: Option<PayloadAttributes>,
) -> oneshot::Receiver<Result<OnForkChoiceUpdated, reth_interfaces::Error>> {
let (tx, rx) = oneshot::channel();
let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
});
rx
}
/// Sends a transition configuration exchagne message to the beacon consensus engine.
///
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_exchangetransitionconfigurationv1>
pub async fn transition_configuration_exchanged(&self) {
let _ = self.to_engine.send(BeaconEngineMessage::TransitionConfigurationExchanged);
}
/// Creates a new [`BeaconConsensusEngineEvent`] listener stream.
pub fn event_listener(&self) -> UnboundedReceiverStream<BeaconConsensusEngineEvent> {
let (tx, rx) = mpsc::unbounded_channel();
let _ = self.to_engine.send(BeaconEngineMessage::EventListener(tx));
UnboundedReceiverStream::new(rx)
}
}

View File

@@ -1,13 +1,17 @@
use crate::{
engine::{message::OnForkChoiceUpdated, metrics::EngineMetrics},
engine::{
forkchoice::{ForkchoiceStateHash, ForkchoiceStateTracker},
message::OnForkChoiceUpdated,
metrics::EngineMetrics,
},
sync::{EngineSyncController, EngineSyncEvent},
};
use futures::{Future, StreamExt, TryFutureExt};
use futures::{Future, StreamExt};
use reth_db::database::Database;
use reth_interfaces::{
blockchain_tree::{
error::{InsertBlockError, InsertBlockErrorKind},
BlockStatus, BlockchainTreeEngine,
BlockStatus, BlockchainTreeEngine, InsertPayloadOk,
},
consensus::ForkchoiceState,
executor::{BlockExecutionError, BlockValidationError},
@@ -17,15 +21,14 @@ use reth_interfaces::{
};
use reth_payload_builder::{PayloadBuilderAttributes, PayloadBuilderHandle};
use reth_primitives::{
listener::EventListeners, stage::StageId, BlockNumHash, BlockNumber, Head, Header, SealedBlock,
SealedHeader, H256, U256,
constants::EPOCH_SLOTS, listener::EventListeners, stage::StageId, BlockNumHash, BlockNumber,
Head, Header, SealedBlock, SealedHeader, H256, U256,
};
use reth_provider::{
BlockReader, BlockSource, CanonChainTracker, ProviderError, StageCheckpointReader,
};
use reth_rpc_types::engine::{
ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, PayloadStatusEnum,
PayloadValidationError,
ExecutionPayload, PayloadAttributes, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
};
use reth_stages::{ControlFlow, Pipeline, PipelineError};
use reth_tasks::TaskSpawner;
@@ -53,16 +56,15 @@ pub use error::{
mod invalid_headers;
use invalid_headers::InvalidHeaderCache;
mod metrics;
mod event;
pub use event::BeaconConsensusEngineEvent;
mod forkchoice;
mod metrics;
pub(crate) mod sync;
use crate::engine::forkchoice::{ForkchoiceStateHash, ForkchoiceStateTracker};
pub use event::BeaconConsensusEngineEvent;
use reth_interfaces::blockchain_tree::InsertPayloadOk;
use reth_primitives::constants::EPOCH_SLOTS;
mod handle;
pub use handle::BeaconConsensusEngineHandle;
/// The maximum number of invalid headers that can be tracked by the engine.
const MAX_INVALID_HEADERS: u32 = 512u32;
@@ -74,81 +76,6 @@ const MAX_INVALID_HEADERS: u32 = 512u32;
/// If the distance exceeds this threshold, the pipeline will be used for sync.
pub const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
/// A _shareable_ beacon consensus frontend. Used to interact with the spawned beacon consensus
/// engine.
///
/// See also [`BeaconConsensusEngine`].
#[derive(Clone, Debug)]
pub struct BeaconConsensusEngineHandle {
to_engine: UnboundedSender<BeaconEngineMessage>,
}
// === impl BeaconConsensusEngineHandle ===
impl BeaconConsensusEngineHandle {
/// Creates a new beacon consensus engine handle.
pub fn new(to_engine: UnboundedSender<BeaconEngineMessage>) -> Self {
Self { to_engine }
}
/// Sends a new payload message to the beacon consensus engine and waits for a response.
///
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_newpayloadv2>
pub async fn new_payload(
&self,
payload: ExecutionPayload,
) -> Result<PayloadStatus, BeaconOnNewPayloadError> {
let (tx, rx) = oneshot::channel();
let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, tx });
rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
}
/// Sends a forkchoice update message to the beacon consensus engine and waits for a response.
///
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
pub async fn fork_choice_updated(
&self,
state: ForkchoiceState,
payload_attrs: Option<PayloadAttributes>,
) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
Ok(self
.send_fork_choice_updated(state, payload_attrs)
.map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable)
.await??
.await?)
}
/// Sends a forkchoice update message to the beacon consensus engine and returns the receiver to
/// wait for a response.
fn send_fork_choice_updated(
&self,
state: ForkchoiceState,
payload_attrs: Option<PayloadAttributes>,
) -> oneshot::Receiver<Result<OnForkChoiceUpdated, reth_interfaces::Error>> {
let (tx, rx) = oneshot::channel();
let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
});
rx
}
/// Sends a transition configuration exchagne message to the beacon consensus engine.
///
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_exchangetransitionconfigurationv1>
pub async fn transition_configuration_exchanged(&self) {
let _ = self.to_engine.send(BeaconEngineMessage::TransitionConfigurationExchanged);
}
/// Creates a new [`BeaconConsensusEngineEvent`] listener stream.
pub fn event_listener(&self) -> UnboundedReceiverStream<BeaconConsensusEngineEvent> {
let (tx, rx) = mpsc::unbounded_channel();
let _ = self.to_engine.send(BeaconEngineMessage::EventListener(tx));
UnboundedReceiverStream::new(rx)
}
}
/// The beacon consensus engine is the driver that switches between historical and live sync.
///
/// The beacon consensus engine is itself driven by messages from the Consensus Layer, which are
@@ -1493,7 +1420,7 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::error::BeaconForkChoiceUpdateError;
use crate::{BeaconForkChoiceUpdateError, BeaconOnNewPayloadError};
use assert_matches::assert_matches;
use reth_blockchain_tree::{
config::BlockchainTreeConfig, externals::TreeExternals, post_state::PostState,
@@ -1510,6 +1437,9 @@ mod tests {
providers::BlockchainProvider, test_utils::TestExecutorFactory, BlockWriter,
ProviderFactory,
};
use reth_rpc_types::engine::{
ExecutionPayload, ForkchoiceState, ForkchoiceUpdated, PayloadStatus,
};
use reth_stages::{test_utils::TestStages, ExecOutput, PipelineError, StageError};
use reth_tasks::TokioTaskExecutor;
use std::{collections::VecDeque, sync::Arc, time::Duration};