From d1162dbbb725f8747ec60260c23f539a06cf9bde Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Sun, 16 Apr 2023 18:22:07 +0200 Subject: [PATCH] refactor: add beacon engine handle (#2266) --- bin/reth/src/node/mod.rs | 7 +- crates/consensus/beacon/src/engine/error.rs | 3 + crates/consensus/beacon/src/engine/mod.rs | 303 +++++++++++++------- crates/rpc/rpc-engine-api/src/engine_api.rs | 39 +-- 4 files changed, 219 insertions(+), 133 deletions(-) diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 0c0778ce89..c504a9be78 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -297,21 +297,22 @@ impl Command { debug!(target: "reth::cli", "Spawning payload builder service"); ctx.task_executor.spawn_critical("payload builder service", payload_service); - let beacon_consensus_engine = BeaconConsensusEngine::new( + let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel( Arc::clone(&db), ctx.task_executor.clone(), pipeline, blockchain_tree.clone(), - consensus_engine_rx, self.debug.max_block, payload_builder.clone(), + consensus_engine_tx, + consensus_engine_rx, ); info!(target: "reth::cli", "Consensus engine initialized"); let engine_api = EngineApi::new( ShareableDatabase::new(db, self.chain.clone()), self.chain.clone(), - consensus_engine_tx.clone(), + beacon_engine_handle, payload_builder.into(), ); info!(target: "reth::cli", "Engine API handler initialized"); diff --git a/crates/consensus/beacon/src/engine/error.rs b/crates/consensus/beacon/src/engine/error.rs index 2c9f30edc4..8ce2726c25 100644 --- a/crates/consensus/beacon/src/engine/error.rs +++ b/crates/consensus/beacon/src/engine/error.rs @@ -30,6 +30,9 @@ pub enum BeaconEngineError { /// Common error. Wrapper around [reth_interfaces::Error]. #[error(transparent)] Common(#[from] reth_interfaces::Error), + /// Thrown when the engine task stopped + #[error("beacon consensus engine task stopped")] + EngineUnavailable, } // box the pipeline error as it is a large enum. diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index e43d775806..41e7408018 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -21,20 +21,83 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use tokio::sync::{mpsc::UnboundedReceiver, oneshot}; +use tokio::sync::{ + mpsc, + mpsc::{UnboundedReceiver, UnboundedSender}, + oneshot, +}; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::*; -mod error; -pub use error::{BeaconEngineError, BeaconEngineResult}; - mod message; pub use message::BeaconEngineMessage; +mod error; +pub use error::{BeaconEngineError, BeaconEngineResult}; + mod metrics; mod pipeline_state; pub use pipeline_state::PipelineState; +/// A _shareable_ beacon consensus frontend. Used to interact with the spawned beacon consensus +/// engine. +/// +/// See also [`BeaconConsensusEngine`]. +#[derive(Clone, Debug)] +pub struct BeaconConsensusEngineHandle { + to_engine: UnboundedSender, +} + +// === impl BeaconConsensusEngineHandle === + +impl BeaconConsensusEngineHandle { + /// Creates a new beacon consensus engine handle. + pub fn new(to_engine: UnboundedSender) -> Self { + Self { to_engine } + } + + /// Sends a new payload message to the beacon consensus engine and waits for a response. + /// + ///See also + pub async fn new_payload( + &self, + payload: ExecutionPayload, + ) -> BeaconEngineResult { + let (tx, rx) = oneshot::channel(); + let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, tx }); + rx.await.map_err(|_| BeaconEngineError::EngineUnavailable)? + } + + /// Sends a forkchoice update message to the beacon consensus engine and waits for a response. + /// + /// See also + pub async fn fork_choice_updated( + &self, + state: ForkchoiceState, + payload_attrs: Option, + ) -> BeaconEngineResult { + self.send_fork_choice_updated(state, payload_attrs) + .await + .map_err(|_| BeaconEngineError::EngineUnavailable)? + } + + /// Sends a forkchoice update message to the beacon consensus engine and returns the receiver to + /// wait for a response. + pub fn send_fork_choice_updated( + &self, + state: ForkchoiceState, + payload_attrs: Option, + ) -> oneshot::Receiver> { + let (tx, rx) = oneshot::channel(); + let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated { + state, + payload_attrs, + tx, + }); + rx + } +} + /// The beacon consensus engine is the driver that switches between historical and live sync. /// /// The beacon consensus engine is itself driven by messages from the Consensus Layer, which are @@ -70,7 +133,9 @@ where /// The blockchain tree used for live sync and reorg tracking. blockchain_tree: BT, /// The Engine API message receiver. - message_rx: UnboundedReceiverStream, + engine_message_rx: UnboundedReceiverStream, + /// A clone of the handle + handle: BeaconConsensusEngineHandle, /// Current forkchoice state. The engine must receive the initial state in order to start /// syncing. forkchoice_state: Option, @@ -92,31 +157,65 @@ where U: SyncStateUpdater + 'static, BT: BlockchainTreeEngine + 'static, { - /// Create new instance of the [BeaconConsensusEngine]. - /// - /// The `message_rx` receiver is connected to the Engine API and is used to - /// handle the messages received from the Consensus Layer. + /// Create a new instance of the [BeaconConsensusEngine]. pub fn new( db: Arc, task_spawner: TS, pipeline: Pipeline, blockchain_tree: BT, - message_rx: UnboundedReceiver, max_block: Option, payload_builder: PayloadBuilderHandle, - ) -> Self { - Self { + ) -> (Self, BeaconConsensusEngineHandle) { + let (to_engine, rx) = mpsc::unbounded_channel(); + Self::with_channel( + db, + task_spawner, + pipeline, + blockchain_tree, + max_block, + payload_builder, + to_engine, + rx, + ) + } + + /// Create a new instance of the [BeaconConsensusEngine] using the given channel to configure + /// the [BeaconEngineMessage] communication channel. + #[allow(clippy::too_many_arguments)] + pub fn with_channel( + db: Arc, + task_spawner: TS, + pipeline: Pipeline, + blockchain_tree: BT, + max_block: Option, + payload_builder: PayloadBuilderHandle, + to_engine: UnboundedSender, + rx: UnboundedReceiver, + ) -> (Self, BeaconConsensusEngineHandle) { + let handle = BeaconConsensusEngineHandle { to_engine }; + let this = Self { db, task_spawner, pipeline_state: Some(PipelineState::Idle(pipeline)), blockchain_tree, - message_rx: UnboundedReceiverStream::new(message_rx), + engine_message_rx: UnboundedReceiverStream::new(rx), + handle: handle.clone(), forkchoice_state: None, next_action: BeaconEngineAction::None, max_block, payload_builder, metrics: Metrics::default(), - } + }; + + (this, handle) + } + + /// Returns a new [`BeaconConsensusEngineHandle`] that can be cloned and shared. + /// + /// The [`BeaconConsensusEngineHandle`] can be used to interact with this + /// [`BeaconConsensusEngine`] + pub fn handle(&self) -> BeaconConsensusEngineHandle { + self.handle.clone() } /// Returns `true` if the pipeline is currently idle. @@ -416,7 +515,7 @@ where // Set the next pipeline state. loop { // Process all incoming messages first. - while let Poll::Ready(Some(msg)) = this.message_rx.poll_next_unpin(cx) { + while let Poll::Ready(Some(msg)) = this.engine_message_rx.poll_next_unpin(cx) { match msg { BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => { this.metrics.forkchoice_updated_messages.increment(1); @@ -559,7 +658,6 @@ mod tests { use reth_tasks::TokioTaskExecutor; use std::{collections::VecDeque, time::Duration}; use tokio::sync::{ - mpsc::{unbounded_channel, UnboundedSender}, oneshot::{self, error::TryRecvError}, watch, }; @@ -576,38 +674,30 @@ mod tests { // Keep the tip receiver around, so it's not dropped. #[allow(dead_code)] tip_rx: watch::Receiver, - sync_tx: UnboundedSender, + engine_handle: BeaconConsensusEngineHandle, } impl TestEnv { fn new( db: Arc, tip_rx: watch::Receiver, - sync_tx: UnboundedSender, + engine_handle: BeaconConsensusEngineHandle, ) -> Self { - Self { db, tip_rx, sync_tx } + Self { db, tip_rx, engine_handle } } - fn send_new_payload( + async fn send_new_payload( &self, payload: ExecutionPayload, - ) -> oneshot::Receiver> { - let (tx, rx) = oneshot::channel(); - self.sync_tx - .send(BeaconEngineMessage::NewPayload { payload, tx }) - .expect("failed to send msg"); - rx + ) -> BeaconEngineResult { + self.engine_handle.new_payload(payload).await } - fn send_forkchoice_updated( + async fn send_forkchoice_updated( &self, state: ForkchoiceState, - ) -> oneshot::Receiver> { - let (tx, rx) = oneshot::channel(); - self.sync_tx - .send(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs: None, tx }) - .expect("failed to send msg"); - rx + ) -> BeaconEngineResult { + self.engine_handle.fork_choice_updated(state, None).await } } @@ -639,20 +729,16 @@ mod tests { BlockchainTree::new(externals, canon_state_notification_sender, config) .expect("failed to create tree"), ); + let (engine, handle) = BeaconConsensusEngine::new( + db.clone(), + TokioTaskExecutor::default(), + pipeline, + tree, + None, + payload_builder, + ); - let (sync_tx, sync_rx) = unbounded_channel(); - ( - BeaconConsensusEngine::new( - db.clone(), - TokioTaskExecutor::default(), - pipeline, - tree, - sync_rx, - None, - payload_builder, - ), - TestEnv::new(db, tip_rx, sync_tx), - ) + (engine, TestEnv::new(db, tip_rx, handle)) } fn spawn_consensus_engine( @@ -681,7 +767,7 @@ mod tests { VecDeque::from([Err(StageError::ChannelClosed)]), Vec::default(), ); - let rx = spawn_consensus_engine(consensus_engine); + let res = spawn_consensus_engine(consensus_engine); let _ = env .send_forkchoice_updated(ForkchoiceState { @@ -690,7 +776,7 @@ mod tests { }) .await; assert_matches!( - rx.await, + res.await, Ok(Err(BeaconEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed)) ); } @@ -827,11 +913,11 @@ mod tests { let mut engine_rx = spawn_consensus_engine(consensus_engine); - let rx = env.send_forkchoice_updated(ForkchoiceState::default()); + let res = env.send_forkchoice_updated(ForkchoiceState::default()).await; let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Invalid { validation_error: BeaconEngineError::ForkchoiceEmptyHead.to_string(), }); - assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + assert_matches!(res, Ok(result) => assert_eq!(result, expected_result)); assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty)); } @@ -866,14 +952,14 @@ mod tests { let rx_invalid = env.send_forkchoice_updated(forkchoice); let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing); - assert_matches!(rx_invalid.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + assert_matches!(rx_invalid.await, Ok(result) => assert_eq!(result, expected_result)); - let rx_valid = env.send_forkchoice_updated(forkchoice); + let res = env.send_forkchoice_updated(forkchoice); let expected_result = ForkchoiceUpdated::new(PayloadStatus::new( PayloadStatusEnum::Valid, Some(block1.hash), )); - assert_matches!(rx_valid.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + assert_matches!(res.await, Ok(result) => assert_eq!(result, expected_result)); assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty)); } @@ -915,12 +1001,12 @@ mod tests { insert_blocks(env.db.as_ref(), [&next_head].into_iter()); let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing); - assert_matches!(invalid_rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + assert_matches!(invalid_rx.await, Ok(result) => assert_eq!(result, expected_result)); let valid_rx = env.send_forkchoice_updated(next_forkchoice_state); let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Valid) .with_latest_valid_hash(next_head.hash); - assert_matches!(valid_rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + assert_matches!(valid_rx.await, Ok(result) => assert_eq!(result, expected_result)); assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty)); } @@ -946,13 +1032,15 @@ mod tests { let engine = spawn_consensus_engine(consensus_engine); - let rx = env.send_forkchoice_updated(ForkchoiceState { - head_block_hash: H256::random(), - finalized_block_hash: block1.hash, - ..Default::default() - }); + let res = env + .send_forkchoice_updated(ForkchoiceState { + head_block_hash: H256::random(), + finalized_block_hash: block1.hash, + ..Default::default() + }) + .await; let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing); - assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + assert_matches!(res, Ok(result) => assert_eq!(result, expected_result)); drop(engine); } @@ -979,27 +1067,30 @@ mod tests { insert_blocks(env.db.as_ref(), [&genesis, &block1].into_iter()); - let engine = spawn_consensus_engine(consensus_engine); + let _engine = spawn_consensus_engine(consensus_engine); - let rx = env.send_forkchoice_updated(ForkchoiceState { - head_block_hash: block1.hash, - finalized_block_hash: block1.hash, - ..Default::default() - }); + let res = env + .send_forkchoice_updated(ForkchoiceState { + head_block_hash: block1.hash, + finalized_block_hash: block1.hash, + ..Default::default() + }) + .await; let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing); - assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + assert_matches!(res, Ok(result) => assert_eq!(result, expected_result)); - let rx = env.send_forkchoice_updated(ForkchoiceState { - head_block_hash: block1.hash, - finalized_block_hash: block1.hash, - ..Default::default() - }); + let res = env + .send_forkchoice_updated(ForkchoiceState { + head_block_hash: block1.hash, + finalized_block_hash: block1.hash, + ..Default::default() + }) + .await; let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Invalid { validation_error: ExecutorError::BlockPreMerge { hash: block1.hash }.to_string(), }) .with_latest_valid_hash(H256::zero()); - assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); - drop(engine); + assert_matches!(res, Ok(result) => assert_eq!(result, expected_result)); } } @@ -1029,14 +1120,14 @@ mod tests { let mut engine_rx = spawn_consensus_engine(consensus_engine); // Send new payload - let rx = env.send_new_payload(random_block(0, None, None, Some(0)).into()); + let res = env.send_new_payload(random_block(0, None, None, Some(0)).into()).await; // Invalid, because this is a genesis block - assert_matches!(rx.await, Ok(Ok(result)) => assert_matches!(result.status, PayloadStatusEnum::Invalid { .. })); + assert_matches!(res, Ok(result) => assert_matches!(result.status, PayloadStatusEnum::Invalid { .. })); // Send new payload - let rx = env.send_new_payload(random_block(1, None, None, Some(0)).into()); + let res = env.send_new_payload(random_block(1, None, None, Some(0)).into()).await; let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Syncing); - assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + assert_matches!(res, Ok(result) => assert_eq!(result, expected_result)); assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty)); } @@ -1064,20 +1155,22 @@ mod tests { let mut engine_rx = spawn_consensus_engine(consensus_engine); // Send forkchoice - let rx = env.send_forkchoice_updated(ForkchoiceState { - head_block_hash: block1.hash, - finalized_block_hash: block1.hash, - ..Default::default() - }); + let res = env + .send_forkchoice_updated(ForkchoiceState { + head_block_hash: block1.hash, + finalized_block_hash: block1.hash, + ..Default::default() + }) + .await; let expected_result = ForkchoiceUpdated::new(PayloadStatus::from_status(PayloadStatusEnum::Syncing)); - assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + assert_matches!(res, Ok(result) => assert_eq!(result, expected_result)); // Send new payload - let rx = env.send_new_payload(block2.clone().into()); + let res = env.send_new_payload(block2.clone().into()).await; let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Valid) .with_latest_valid_hash(block2.hash); - assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + assert_matches!(res, Ok(result) => assert_eq!(result, expected_result)); assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty)); } @@ -1104,20 +1197,22 @@ mod tests { let mut engine_rx = spawn_consensus_engine(consensus_engine); // Send forkchoice - let rx = env.send_forkchoice_updated(ForkchoiceState { - head_block_hash: genesis.hash, - finalized_block_hash: genesis.hash, - ..Default::default() - }); + let res = env + .send_forkchoice_updated(ForkchoiceState { + head_block_hash: genesis.hash, + finalized_block_hash: genesis.hash, + ..Default::default() + }) + .await; let expected_result = ForkchoiceUpdated::new(PayloadStatus::from_status(PayloadStatusEnum::Syncing)); - assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + assert_matches!(res, Ok(result) => assert_eq!(result, expected_result)); // Send new payload let block = random_block(2, Some(H256::random()), None, Some(0)); - let rx = env.send_new_payload(block.into()); + let res = env.send_new_payload(block.into()).await; let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Syncing); - assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + assert_matches!(res, Ok(result) => assert_eq!(result, expected_result)); assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty)); } @@ -1154,22 +1249,24 @@ mod tests { let mut engine_rx = spawn_consensus_engine(consensus_engine); // Send forkchoice - let rx = env.send_forkchoice_updated(ForkchoiceState { - head_block_hash: block1.hash, - finalized_block_hash: block1.hash, - ..Default::default() - }); + let res = env + .send_forkchoice_updated(ForkchoiceState { + head_block_hash: block1.hash, + finalized_block_hash: block1.hash, + ..Default::default() + }) + .await; let expected_result = ForkchoiceUpdated::new(PayloadStatus::from_status(PayloadStatusEnum::Syncing)); - assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + assert_matches!(res, Ok(result) => assert_eq!(result, expected_result)); // Send new payload - let rx = env.send_new_payload(block2.clone().into()); + let res = env.send_new_payload(block2.clone().into()).await; let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Invalid { validation_error: ExecutorError::BlockPreMerge { hash: block2.hash }.to_string(), }) .with_latest_valid_hash(H256::zero()); - assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + assert_matches!(res, Ok(result) => assert_eq!(result, expected_result)); assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty)); } diff --git a/crates/rpc/rpc-engine-api/src/engine_api.rs b/crates/rpc/rpc-engine-api/src/engine_api.rs index 197d873643..283a3f9953 100644 --- a/crates/rpc/rpc-engine-api/src/engine_api.rs +++ b/crates/rpc/rpc-engine-api/src/engine_api.rs @@ -1,7 +1,7 @@ use crate::{EngineApiError, EngineApiMessageVersion, EngineApiResult}; use async_trait::async_trait; use jsonrpsee_core::RpcResult as Result; -use reth_beacon_consensus::BeaconEngineMessage; +use reth_beacon_consensus::BeaconConsensusEngineHandle; use reth_interfaces::consensus::ForkchoiceState; use reth_payload_builder::PayloadStore; use reth_primitives::{BlockHash, BlockId, BlockNumber, ChainSpec, Hardfork, U64}; @@ -12,7 +12,7 @@ use reth_rpc_types::engine::{ PayloadAttributes, PayloadId, PayloadStatus, TransitionConfiguration, CAPABILITIES, }; use std::sync::Arc; -use tokio::sync::{mpsc::UnboundedSender, oneshot}; +use tokio::sync::oneshot; use tracing::trace; /// The Engine API response sender. @@ -29,7 +29,7 @@ pub struct EngineApi { /// Consensus configuration chain_spec: Arc, /// The channel to send messages to the beacon consensus engine. - to_beacon_consensus: UnboundedSender, + beacon_consensus: BeaconConsensusEngineHandle, /// The type that can communicate with the payload service to retrieve payloads. payload_store: PayloadStore, } @@ -42,10 +42,10 @@ where pub fn new( client: Client, chain_spec: Arc, - to_beacon_consensus: UnboundedSender, + beacon_consensus: BeaconConsensusEngineHandle, payload_store: PayloadStore, ) -> Self { - Self { client, chain_spec, to_beacon_consensus, payload_store } + Self { client, chain_spec, beacon_consensus, payload_store } } /// See also @@ -59,9 +59,7 @@ where payload.timestamp.as_u64(), payload.withdrawals.is_some(), )?; - let (tx, rx) = oneshot::channel(); - self.to_beacon_consensus.send(BeaconEngineMessage::NewPayload { payload, tx })?; - Ok(rx.await??) + Ok(self.beacon_consensus.new_payload(payload).await?) } /// See also @@ -74,9 +72,7 @@ where payload.timestamp.as_u64(), payload.withdrawals.is_some(), )?; - let (tx, rx) = oneshot::channel(); - self.to_beacon_consensus.send(BeaconEngineMessage::NewPayload { payload, tx })?; - Ok(rx.await??) + Ok(self.beacon_consensus.new_payload(payload).await?) } /// Sends a message to the beacon consensus engine to update the fork choice _without_ @@ -97,13 +93,7 @@ where attrs.withdrawals.is_some(), )?; } - let (tx, rx) = oneshot::channel(); - self.to_beacon_consensus.send(BeaconEngineMessage::ForkchoiceUpdated { - state, - payload_attrs, - tx, - })?; - Ok(rx.await??) + Ok(self.beacon_consensus.fork_choice_updated(state, payload_attrs).await?) } /// Sends a message to the beacon consensus engine to update the fork choice _with_ withdrawals, @@ -122,13 +112,7 @@ where attrs.withdrawals.is_some(), )?; } - let (tx, rx) = oneshot::channel(); - self.to_beacon_consensus.send(BeaconEngineMessage::ForkchoiceUpdated { - state, - payload_attrs, - tx, - })?; - Ok(rx.await??) + Ok(self.beacon_consensus.fork_choice_updated(state, payload_attrs).await?) } /// Returns the most recent version of the payload that is available in the corresponding @@ -424,6 +408,7 @@ impl std::fmt::Debug for EngineApi { mod tests { use super::*; use assert_matches::assert_matches; + use reth_beacon_consensus::BeaconEngineMessage; use reth_interfaces::test_utils::generators::random_block; use reth_payload_builder::test_utils::spawn_test_payload_service; use reth_primitives::{SealedBlock, H256, MAINNET}; @@ -435,11 +420,11 @@ mod tests { let chain_spec = Arc::new(MAINNET.clone()); let client = Arc::new(MockEthProvider::default()); let payload_store = spawn_test_payload_service(); - let (to_beacon_consensus, engine_rx) = unbounded_channel(); + let (to_engine, engine_rx) = unbounded_channel(); let api = EngineApi::new( client.clone(), chain_spec.clone(), - to_beacon_consensus, + BeaconConsensusEngineHandle::new(to_engine), payload_store.into(), ); let handle = EngineApiTestHandle { chain_spec, client, from_api: engine_rx };