mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-29 00:58:11 -05:00
refactor: add beacon engine handle (#2266)
This commit is contained in:
@@ -297,21 +297,22 @@ impl Command {
|
||||
debug!(target: "reth::cli", "Spawning payload builder service");
|
||||
ctx.task_executor.spawn_critical("payload builder service", payload_service);
|
||||
|
||||
let beacon_consensus_engine = BeaconConsensusEngine::new(
|
||||
let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel(
|
||||
Arc::clone(&db),
|
||||
ctx.task_executor.clone(),
|
||||
pipeline,
|
||||
blockchain_tree.clone(),
|
||||
consensus_engine_rx,
|
||||
self.debug.max_block,
|
||||
payload_builder.clone(),
|
||||
consensus_engine_tx,
|
||||
consensus_engine_rx,
|
||||
);
|
||||
info!(target: "reth::cli", "Consensus engine initialized");
|
||||
|
||||
let engine_api = EngineApi::new(
|
||||
ShareableDatabase::new(db, self.chain.clone()),
|
||||
self.chain.clone(),
|
||||
consensus_engine_tx.clone(),
|
||||
beacon_engine_handle,
|
||||
payload_builder.into(),
|
||||
);
|
||||
info!(target: "reth::cli", "Engine API handler initialized");
|
||||
|
||||
@@ -30,6 +30,9 @@ pub enum BeaconEngineError {
|
||||
/// Common error. Wrapper around [reth_interfaces::Error].
|
||||
#[error(transparent)]
|
||||
Common(#[from] reth_interfaces::Error),
|
||||
/// Thrown when the engine task stopped
|
||||
#[error("beacon consensus engine task stopped")]
|
||||
EngineUnavailable,
|
||||
}
|
||||
|
||||
// box the pipeline error as it is a large enum.
|
||||
|
||||
@@ -21,20 +21,83 @@ use std::{
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tokio::sync::{mpsc::UnboundedReceiver, oneshot};
|
||||
use tokio::sync::{
|
||||
mpsc,
|
||||
mpsc::{UnboundedReceiver, UnboundedSender},
|
||||
oneshot,
|
||||
};
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
use tracing::*;
|
||||
|
||||
mod error;
|
||||
pub use error::{BeaconEngineError, BeaconEngineResult};
|
||||
|
||||
mod message;
|
||||
pub use message::BeaconEngineMessage;
|
||||
|
||||
mod error;
|
||||
pub use error::{BeaconEngineError, BeaconEngineResult};
|
||||
|
||||
mod metrics;
|
||||
mod pipeline_state;
|
||||
pub use pipeline_state::PipelineState;
|
||||
|
||||
/// A _shareable_ beacon consensus frontend. Used to interact with the spawned beacon consensus
|
||||
/// engine.
|
||||
///
|
||||
/// See also [`BeaconConsensusEngine`].
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct BeaconConsensusEngineHandle {
|
||||
to_engine: UnboundedSender<BeaconEngineMessage>,
|
||||
}
|
||||
|
||||
// === impl BeaconConsensusEngineHandle ===
|
||||
|
||||
impl BeaconConsensusEngineHandle {
|
||||
/// Creates a new beacon consensus engine handle.
|
||||
pub fn new(to_engine: UnboundedSender<BeaconEngineMessage>) -> Self {
|
||||
Self { to_engine }
|
||||
}
|
||||
|
||||
/// Sends a new payload message to the beacon consensus engine and waits for a response.
|
||||
///
|
||||
///See also <https://github.com/ethereum/execution-apis/blob/8db51dcd2f4bdfbd9ad6e4a7560aac97010ad063/src/engine/specification.md#engine_newpayloadv2>
|
||||
pub async fn new_payload(
|
||||
&self,
|
||||
payload: ExecutionPayload,
|
||||
) -> BeaconEngineResult<PayloadStatus> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, tx });
|
||||
rx.await.map_err(|_| BeaconEngineError::EngineUnavailable)?
|
||||
}
|
||||
|
||||
/// Sends a forkchoice update message to the beacon consensus engine and waits for a response.
|
||||
///
|
||||
/// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/specification.md#engine_forkchoiceupdatedv2>
|
||||
pub async fn fork_choice_updated(
|
||||
&self,
|
||||
state: ForkchoiceState,
|
||||
payload_attrs: Option<PayloadAttributes>,
|
||||
) -> BeaconEngineResult<ForkchoiceUpdated> {
|
||||
self.send_fork_choice_updated(state, payload_attrs)
|
||||
.await
|
||||
.map_err(|_| BeaconEngineError::EngineUnavailable)?
|
||||
}
|
||||
|
||||
/// Sends a forkchoice update message to the beacon consensus engine and returns the receiver to
|
||||
/// wait for a response.
|
||||
pub fn send_fork_choice_updated(
|
||||
&self,
|
||||
state: ForkchoiceState,
|
||||
payload_attrs: Option<PayloadAttributes>,
|
||||
) -> oneshot::Receiver<BeaconEngineResult<ForkchoiceUpdated>> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
|
||||
state,
|
||||
payload_attrs,
|
||||
tx,
|
||||
});
|
||||
rx
|
||||
}
|
||||
}
|
||||
|
||||
/// The beacon consensus engine is the driver that switches between historical and live sync.
|
||||
///
|
||||
/// The beacon consensus engine is itself driven by messages from the Consensus Layer, which are
|
||||
@@ -70,7 +133,9 @@ where
|
||||
/// The blockchain tree used for live sync and reorg tracking.
|
||||
blockchain_tree: BT,
|
||||
/// The Engine API message receiver.
|
||||
message_rx: UnboundedReceiverStream<BeaconEngineMessage>,
|
||||
engine_message_rx: UnboundedReceiverStream<BeaconEngineMessage>,
|
||||
/// A clone of the handle
|
||||
handle: BeaconConsensusEngineHandle,
|
||||
/// Current forkchoice state. The engine must receive the initial state in order to start
|
||||
/// syncing.
|
||||
forkchoice_state: Option<ForkchoiceState>,
|
||||
@@ -92,31 +157,65 @@ where
|
||||
U: SyncStateUpdater + 'static,
|
||||
BT: BlockchainTreeEngine + 'static,
|
||||
{
|
||||
/// Create new instance of the [BeaconConsensusEngine].
|
||||
///
|
||||
/// The `message_rx` receiver is connected to the Engine API and is used to
|
||||
/// handle the messages received from the Consensus Layer.
|
||||
/// Create a new instance of the [BeaconConsensusEngine].
|
||||
pub fn new(
|
||||
db: Arc<DB>,
|
||||
task_spawner: TS,
|
||||
pipeline: Pipeline<DB, U>,
|
||||
blockchain_tree: BT,
|
||||
message_rx: UnboundedReceiver<BeaconEngineMessage>,
|
||||
max_block: Option<BlockNumber>,
|
||||
payload_builder: PayloadBuilderHandle,
|
||||
) -> Self {
|
||||
Self {
|
||||
) -> (Self, BeaconConsensusEngineHandle) {
|
||||
let (to_engine, rx) = mpsc::unbounded_channel();
|
||||
Self::with_channel(
|
||||
db,
|
||||
task_spawner,
|
||||
pipeline,
|
||||
blockchain_tree,
|
||||
max_block,
|
||||
payload_builder,
|
||||
to_engine,
|
||||
rx,
|
||||
)
|
||||
}
|
||||
|
||||
/// Create a new instance of the [BeaconConsensusEngine] using the given channel to configure
|
||||
/// the [BeaconEngineMessage] communication channel.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn with_channel(
|
||||
db: Arc<DB>,
|
||||
task_spawner: TS,
|
||||
pipeline: Pipeline<DB, U>,
|
||||
blockchain_tree: BT,
|
||||
max_block: Option<BlockNumber>,
|
||||
payload_builder: PayloadBuilderHandle,
|
||||
to_engine: UnboundedSender<BeaconEngineMessage>,
|
||||
rx: UnboundedReceiver<BeaconEngineMessage>,
|
||||
) -> (Self, BeaconConsensusEngineHandle) {
|
||||
let handle = BeaconConsensusEngineHandle { to_engine };
|
||||
let this = Self {
|
||||
db,
|
||||
task_spawner,
|
||||
pipeline_state: Some(PipelineState::Idle(pipeline)),
|
||||
blockchain_tree,
|
||||
message_rx: UnboundedReceiverStream::new(message_rx),
|
||||
engine_message_rx: UnboundedReceiverStream::new(rx),
|
||||
handle: handle.clone(),
|
||||
forkchoice_state: None,
|
||||
next_action: BeaconEngineAction::None,
|
||||
max_block,
|
||||
payload_builder,
|
||||
metrics: Metrics::default(),
|
||||
}
|
||||
};
|
||||
|
||||
(this, handle)
|
||||
}
|
||||
|
||||
/// Returns a new [`BeaconConsensusEngineHandle`] that can be cloned and shared.
|
||||
///
|
||||
/// The [`BeaconConsensusEngineHandle`] can be used to interact with this
|
||||
/// [`BeaconConsensusEngine`]
|
||||
pub fn handle(&self) -> BeaconConsensusEngineHandle {
|
||||
self.handle.clone()
|
||||
}
|
||||
|
||||
/// Returns `true` if the pipeline is currently idle.
|
||||
@@ -416,7 +515,7 @@ where
|
||||
// Set the next pipeline state.
|
||||
loop {
|
||||
// Process all incoming messages first.
|
||||
while let Poll::Ready(Some(msg)) = this.message_rx.poll_next_unpin(cx) {
|
||||
while let Poll::Ready(Some(msg)) = this.engine_message_rx.poll_next_unpin(cx) {
|
||||
match msg {
|
||||
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
|
||||
this.metrics.forkchoice_updated_messages.increment(1);
|
||||
@@ -559,7 +658,6 @@ mod tests {
|
||||
use reth_tasks::TokioTaskExecutor;
|
||||
use std::{collections::VecDeque, time::Duration};
|
||||
use tokio::sync::{
|
||||
mpsc::{unbounded_channel, UnboundedSender},
|
||||
oneshot::{self, error::TryRecvError},
|
||||
watch,
|
||||
};
|
||||
@@ -576,38 +674,30 @@ mod tests {
|
||||
// Keep the tip receiver around, so it's not dropped.
|
||||
#[allow(dead_code)]
|
||||
tip_rx: watch::Receiver<H256>,
|
||||
sync_tx: UnboundedSender<BeaconEngineMessage>,
|
||||
engine_handle: BeaconConsensusEngineHandle,
|
||||
}
|
||||
|
||||
impl<DB> TestEnv<DB> {
|
||||
fn new(
|
||||
db: Arc<DB>,
|
||||
tip_rx: watch::Receiver<H256>,
|
||||
sync_tx: UnboundedSender<BeaconEngineMessage>,
|
||||
engine_handle: BeaconConsensusEngineHandle,
|
||||
) -> Self {
|
||||
Self { db, tip_rx, sync_tx }
|
||||
Self { db, tip_rx, engine_handle }
|
||||
}
|
||||
|
||||
fn send_new_payload(
|
||||
async fn send_new_payload(
|
||||
&self,
|
||||
payload: ExecutionPayload,
|
||||
) -> oneshot::Receiver<BeaconEngineResult<PayloadStatus>> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.sync_tx
|
||||
.send(BeaconEngineMessage::NewPayload { payload, tx })
|
||||
.expect("failed to send msg");
|
||||
rx
|
||||
) -> BeaconEngineResult<PayloadStatus> {
|
||||
self.engine_handle.new_payload(payload).await
|
||||
}
|
||||
|
||||
fn send_forkchoice_updated(
|
||||
async fn send_forkchoice_updated(
|
||||
&self,
|
||||
state: ForkchoiceState,
|
||||
) -> oneshot::Receiver<BeaconEngineResult<ForkchoiceUpdated>> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.sync_tx
|
||||
.send(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs: None, tx })
|
||||
.expect("failed to send msg");
|
||||
rx
|
||||
) -> BeaconEngineResult<ForkchoiceUpdated> {
|
||||
self.engine_handle.fork_choice_updated(state, None).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -639,20 +729,16 @@ mod tests {
|
||||
BlockchainTree::new(externals, canon_state_notification_sender, config)
|
||||
.expect("failed to create tree"),
|
||||
);
|
||||
let (engine, handle) = BeaconConsensusEngine::new(
|
||||
db.clone(),
|
||||
TokioTaskExecutor::default(),
|
||||
pipeline,
|
||||
tree,
|
||||
None,
|
||||
payload_builder,
|
||||
);
|
||||
|
||||
let (sync_tx, sync_rx) = unbounded_channel();
|
||||
(
|
||||
BeaconConsensusEngine::new(
|
||||
db.clone(),
|
||||
TokioTaskExecutor::default(),
|
||||
pipeline,
|
||||
tree,
|
||||
sync_rx,
|
||||
None,
|
||||
payload_builder,
|
||||
),
|
||||
TestEnv::new(db, tip_rx, sync_tx),
|
||||
)
|
||||
(engine, TestEnv::new(db, tip_rx, handle))
|
||||
}
|
||||
|
||||
fn spawn_consensus_engine(
|
||||
@@ -681,7 +767,7 @@ mod tests {
|
||||
VecDeque::from([Err(StageError::ChannelClosed)]),
|
||||
Vec::default(),
|
||||
);
|
||||
let rx = spawn_consensus_engine(consensus_engine);
|
||||
let res = spawn_consensus_engine(consensus_engine);
|
||||
|
||||
let _ = env
|
||||
.send_forkchoice_updated(ForkchoiceState {
|
||||
@@ -690,7 +776,7 @@ mod tests {
|
||||
})
|
||||
.await;
|
||||
assert_matches!(
|
||||
rx.await,
|
||||
res.await,
|
||||
Ok(Err(BeaconEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed))
|
||||
);
|
||||
}
|
||||
@@ -827,11 +913,11 @@ mod tests {
|
||||
|
||||
let mut engine_rx = spawn_consensus_engine(consensus_engine);
|
||||
|
||||
let rx = env.send_forkchoice_updated(ForkchoiceState::default());
|
||||
let res = env.send_forkchoice_updated(ForkchoiceState::default()).await;
|
||||
let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Invalid {
|
||||
validation_error: BeaconEngineError::ForkchoiceEmptyHead.to_string(),
|
||||
});
|
||||
assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result));
|
||||
assert_matches!(res, Ok(result) => assert_eq!(result, expected_result));
|
||||
|
||||
assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
|
||||
}
|
||||
@@ -866,14 +952,14 @@ mod tests {
|
||||
|
||||
let rx_invalid = env.send_forkchoice_updated(forkchoice);
|
||||
let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing);
|
||||
assert_matches!(rx_invalid.await, Ok(Ok(result)) => assert_eq!(result, expected_result));
|
||||
assert_matches!(rx_invalid.await, Ok(result) => assert_eq!(result, expected_result));
|
||||
|
||||
let rx_valid = env.send_forkchoice_updated(forkchoice);
|
||||
let res = env.send_forkchoice_updated(forkchoice);
|
||||
let expected_result = ForkchoiceUpdated::new(PayloadStatus::new(
|
||||
PayloadStatusEnum::Valid,
|
||||
Some(block1.hash),
|
||||
));
|
||||
assert_matches!(rx_valid.await, Ok(Ok(result)) => assert_eq!(result, expected_result));
|
||||
assert_matches!(res.await, Ok(result) => assert_eq!(result, expected_result));
|
||||
|
||||
assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
|
||||
}
|
||||
@@ -915,12 +1001,12 @@ mod tests {
|
||||
insert_blocks(env.db.as_ref(), [&next_head].into_iter());
|
||||
|
||||
let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing);
|
||||
assert_matches!(invalid_rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result));
|
||||
assert_matches!(invalid_rx.await, Ok(result) => assert_eq!(result, expected_result));
|
||||
|
||||
let valid_rx = env.send_forkchoice_updated(next_forkchoice_state);
|
||||
let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Valid)
|
||||
.with_latest_valid_hash(next_head.hash);
|
||||
assert_matches!(valid_rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result));
|
||||
assert_matches!(valid_rx.await, Ok(result) => assert_eq!(result, expected_result));
|
||||
|
||||
assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
|
||||
}
|
||||
@@ -946,13 +1032,15 @@ mod tests {
|
||||
|
||||
let engine = spawn_consensus_engine(consensus_engine);
|
||||
|
||||
let rx = env.send_forkchoice_updated(ForkchoiceState {
|
||||
head_block_hash: H256::random(),
|
||||
finalized_block_hash: block1.hash,
|
||||
..Default::default()
|
||||
});
|
||||
let res = env
|
||||
.send_forkchoice_updated(ForkchoiceState {
|
||||
head_block_hash: H256::random(),
|
||||
finalized_block_hash: block1.hash,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing);
|
||||
assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result));
|
||||
assert_matches!(res, Ok(result) => assert_eq!(result, expected_result));
|
||||
drop(engine);
|
||||
}
|
||||
|
||||
@@ -979,27 +1067,30 @@ mod tests {
|
||||
|
||||
insert_blocks(env.db.as_ref(), [&genesis, &block1].into_iter());
|
||||
|
||||
let engine = spawn_consensus_engine(consensus_engine);
|
||||
let _engine = spawn_consensus_engine(consensus_engine);
|
||||
|
||||
let rx = env.send_forkchoice_updated(ForkchoiceState {
|
||||
head_block_hash: block1.hash,
|
||||
finalized_block_hash: block1.hash,
|
||||
..Default::default()
|
||||
});
|
||||
let res = env
|
||||
.send_forkchoice_updated(ForkchoiceState {
|
||||
head_block_hash: block1.hash,
|
||||
finalized_block_hash: block1.hash,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing);
|
||||
assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result));
|
||||
assert_matches!(res, Ok(result) => assert_eq!(result, expected_result));
|
||||
|
||||
let rx = env.send_forkchoice_updated(ForkchoiceState {
|
||||
head_block_hash: block1.hash,
|
||||
finalized_block_hash: block1.hash,
|
||||
..Default::default()
|
||||
});
|
||||
let res = env
|
||||
.send_forkchoice_updated(ForkchoiceState {
|
||||
head_block_hash: block1.hash,
|
||||
finalized_block_hash: block1.hash,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Invalid {
|
||||
validation_error: ExecutorError::BlockPreMerge { hash: block1.hash }.to_string(),
|
||||
})
|
||||
.with_latest_valid_hash(H256::zero());
|
||||
assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result));
|
||||
drop(engine);
|
||||
assert_matches!(res, Ok(result) => assert_eq!(result, expected_result));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1029,14 +1120,14 @@ mod tests {
|
||||
let mut engine_rx = spawn_consensus_engine(consensus_engine);
|
||||
|
||||
// Send new payload
|
||||
let rx = env.send_new_payload(random_block(0, None, None, Some(0)).into());
|
||||
let res = env.send_new_payload(random_block(0, None, None, Some(0)).into()).await;
|
||||
// Invalid, because this is a genesis block
|
||||
assert_matches!(rx.await, Ok(Ok(result)) => assert_matches!(result.status, PayloadStatusEnum::Invalid { .. }));
|
||||
assert_matches!(res, Ok(result) => assert_matches!(result.status, PayloadStatusEnum::Invalid { .. }));
|
||||
|
||||
// Send new payload
|
||||
let rx = env.send_new_payload(random_block(1, None, None, Some(0)).into());
|
||||
let res = env.send_new_payload(random_block(1, None, None, Some(0)).into()).await;
|
||||
let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Syncing);
|
||||
assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result));
|
||||
assert_matches!(res, Ok(result) => assert_eq!(result, expected_result));
|
||||
|
||||
assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
|
||||
}
|
||||
@@ -1064,20 +1155,22 @@ mod tests {
|
||||
let mut engine_rx = spawn_consensus_engine(consensus_engine);
|
||||
|
||||
// Send forkchoice
|
||||
let rx = env.send_forkchoice_updated(ForkchoiceState {
|
||||
head_block_hash: block1.hash,
|
||||
finalized_block_hash: block1.hash,
|
||||
..Default::default()
|
||||
});
|
||||
let res = env
|
||||
.send_forkchoice_updated(ForkchoiceState {
|
||||
head_block_hash: block1.hash,
|
||||
finalized_block_hash: block1.hash,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
let expected_result =
|
||||
ForkchoiceUpdated::new(PayloadStatus::from_status(PayloadStatusEnum::Syncing));
|
||||
assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result));
|
||||
assert_matches!(res, Ok(result) => assert_eq!(result, expected_result));
|
||||
|
||||
// Send new payload
|
||||
let rx = env.send_new_payload(block2.clone().into());
|
||||
let res = env.send_new_payload(block2.clone().into()).await;
|
||||
let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Valid)
|
||||
.with_latest_valid_hash(block2.hash);
|
||||
assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result));
|
||||
assert_matches!(res, Ok(result) => assert_eq!(result, expected_result));
|
||||
|
||||
assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
|
||||
}
|
||||
@@ -1104,20 +1197,22 @@ mod tests {
|
||||
let mut engine_rx = spawn_consensus_engine(consensus_engine);
|
||||
|
||||
// Send forkchoice
|
||||
let rx = env.send_forkchoice_updated(ForkchoiceState {
|
||||
head_block_hash: genesis.hash,
|
||||
finalized_block_hash: genesis.hash,
|
||||
..Default::default()
|
||||
});
|
||||
let res = env
|
||||
.send_forkchoice_updated(ForkchoiceState {
|
||||
head_block_hash: genesis.hash,
|
||||
finalized_block_hash: genesis.hash,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
let expected_result =
|
||||
ForkchoiceUpdated::new(PayloadStatus::from_status(PayloadStatusEnum::Syncing));
|
||||
assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result));
|
||||
assert_matches!(res, Ok(result) => assert_eq!(result, expected_result));
|
||||
|
||||
// Send new payload
|
||||
let block = random_block(2, Some(H256::random()), None, Some(0));
|
||||
let rx = env.send_new_payload(block.into());
|
||||
let res = env.send_new_payload(block.into()).await;
|
||||
let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Syncing);
|
||||
assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result));
|
||||
assert_matches!(res, Ok(result) => assert_eq!(result, expected_result));
|
||||
|
||||
assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
|
||||
}
|
||||
@@ -1154,22 +1249,24 @@ mod tests {
|
||||
let mut engine_rx = spawn_consensus_engine(consensus_engine);
|
||||
|
||||
// Send forkchoice
|
||||
let rx = env.send_forkchoice_updated(ForkchoiceState {
|
||||
head_block_hash: block1.hash,
|
||||
finalized_block_hash: block1.hash,
|
||||
..Default::default()
|
||||
});
|
||||
let res = env
|
||||
.send_forkchoice_updated(ForkchoiceState {
|
||||
head_block_hash: block1.hash,
|
||||
finalized_block_hash: block1.hash,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
let expected_result =
|
||||
ForkchoiceUpdated::new(PayloadStatus::from_status(PayloadStatusEnum::Syncing));
|
||||
assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result));
|
||||
assert_matches!(res, Ok(result) => assert_eq!(result, expected_result));
|
||||
|
||||
// Send new payload
|
||||
let rx = env.send_new_payload(block2.clone().into());
|
||||
let res = env.send_new_payload(block2.clone().into()).await;
|
||||
let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Invalid {
|
||||
validation_error: ExecutorError::BlockPreMerge { hash: block2.hash }.to_string(),
|
||||
})
|
||||
.with_latest_valid_hash(H256::zero());
|
||||
assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result));
|
||||
assert_matches!(res, Ok(result) => assert_eq!(result, expected_result));
|
||||
|
||||
assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::{EngineApiError, EngineApiMessageVersion, EngineApiResult};
|
||||
use async_trait::async_trait;
|
||||
use jsonrpsee_core::RpcResult as Result;
|
||||
use reth_beacon_consensus::BeaconEngineMessage;
|
||||
use reth_beacon_consensus::BeaconConsensusEngineHandle;
|
||||
use reth_interfaces::consensus::ForkchoiceState;
|
||||
use reth_payload_builder::PayloadStore;
|
||||
use reth_primitives::{BlockHash, BlockId, BlockNumber, ChainSpec, Hardfork, U64};
|
||||
@@ -12,7 +12,7 @@ use reth_rpc_types::engine::{
|
||||
PayloadAttributes, PayloadId, PayloadStatus, TransitionConfiguration, CAPABILITIES,
|
||||
};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{mpsc::UnboundedSender, oneshot};
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::trace;
|
||||
|
||||
/// The Engine API response sender.
|
||||
@@ -29,7 +29,7 @@ pub struct EngineApi<Client> {
|
||||
/// Consensus configuration
|
||||
chain_spec: Arc<ChainSpec>,
|
||||
/// The channel to send messages to the beacon consensus engine.
|
||||
to_beacon_consensus: UnboundedSender<BeaconEngineMessage>,
|
||||
beacon_consensus: BeaconConsensusEngineHandle,
|
||||
/// The type that can communicate with the payload service to retrieve payloads.
|
||||
payload_store: PayloadStore,
|
||||
}
|
||||
@@ -42,10 +42,10 @@ where
|
||||
pub fn new(
|
||||
client: Client,
|
||||
chain_spec: Arc<ChainSpec>,
|
||||
to_beacon_consensus: UnboundedSender<BeaconEngineMessage>,
|
||||
beacon_consensus: BeaconConsensusEngineHandle,
|
||||
payload_store: PayloadStore,
|
||||
) -> Self {
|
||||
Self { client, chain_spec, to_beacon_consensus, payload_store }
|
||||
Self { client, chain_spec, beacon_consensus, payload_store }
|
||||
}
|
||||
|
||||
/// See also <https://github.com/ethereum/execution-apis/blob/8db51dcd2f4bdfbd9ad6e4a7560aac97010ad063/src/engine/specification.md#engine_newpayloadv1>
|
||||
@@ -59,9 +59,7 @@ where
|
||||
payload.timestamp.as_u64(),
|
||||
payload.withdrawals.is_some(),
|
||||
)?;
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.to_beacon_consensus.send(BeaconEngineMessage::NewPayload { payload, tx })?;
|
||||
Ok(rx.await??)
|
||||
Ok(self.beacon_consensus.new_payload(payload).await?)
|
||||
}
|
||||
|
||||
/// See also <https://github.com/ethereum/execution-apis/blob/8db51dcd2f4bdfbd9ad6e4a7560aac97010ad063/src/engine/specification.md#engine_newpayloadv1>
|
||||
@@ -74,9 +72,7 @@ where
|
||||
payload.timestamp.as_u64(),
|
||||
payload.withdrawals.is_some(),
|
||||
)?;
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.to_beacon_consensus.send(BeaconEngineMessage::NewPayload { payload, tx })?;
|
||||
Ok(rx.await??)
|
||||
Ok(self.beacon_consensus.new_payload(payload).await?)
|
||||
}
|
||||
|
||||
/// Sends a message to the beacon consensus engine to update the fork choice _without_
|
||||
@@ -97,13 +93,7 @@ where
|
||||
attrs.withdrawals.is_some(),
|
||||
)?;
|
||||
}
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.to_beacon_consensus.send(BeaconEngineMessage::ForkchoiceUpdated {
|
||||
state,
|
||||
payload_attrs,
|
||||
tx,
|
||||
})?;
|
||||
Ok(rx.await??)
|
||||
Ok(self.beacon_consensus.fork_choice_updated(state, payload_attrs).await?)
|
||||
}
|
||||
|
||||
/// Sends a message to the beacon consensus engine to update the fork choice _with_ withdrawals,
|
||||
@@ -122,13 +112,7 @@ where
|
||||
attrs.withdrawals.is_some(),
|
||||
)?;
|
||||
}
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.to_beacon_consensus.send(BeaconEngineMessage::ForkchoiceUpdated {
|
||||
state,
|
||||
payload_attrs,
|
||||
tx,
|
||||
})?;
|
||||
Ok(rx.await??)
|
||||
Ok(self.beacon_consensus.fork_choice_updated(state, payload_attrs).await?)
|
||||
}
|
||||
|
||||
/// Returns the most recent version of the payload that is available in the corresponding
|
||||
@@ -424,6 +408,7 @@ impl<Client> std::fmt::Debug for EngineApi<Client> {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use assert_matches::assert_matches;
|
||||
use reth_beacon_consensus::BeaconEngineMessage;
|
||||
use reth_interfaces::test_utils::generators::random_block;
|
||||
use reth_payload_builder::test_utils::spawn_test_payload_service;
|
||||
use reth_primitives::{SealedBlock, H256, MAINNET};
|
||||
@@ -435,11 +420,11 @@ mod tests {
|
||||
let chain_spec = Arc::new(MAINNET.clone());
|
||||
let client = Arc::new(MockEthProvider::default());
|
||||
let payload_store = spawn_test_payload_service();
|
||||
let (to_beacon_consensus, engine_rx) = unbounded_channel();
|
||||
let (to_engine, engine_rx) = unbounded_channel();
|
||||
let api = EngineApi::new(
|
||||
client.clone(),
|
||||
chain_spec.clone(),
|
||||
to_beacon_consensus,
|
||||
BeaconConsensusEngineHandle::new(to_engine),
|
||||
payload_store.into(),
|
||||
);
|
||||
let handle = EngineApiTestHandle { chain_spec, client, from_api: engine_rx };
|
||||
|
||||
Reference in New Issue
Block a user