feat: emit beacon event after handled FCU (#9648)

This commit is contained in:
Matthias Seitz
2024-07-19 15:11:48 +02:00
committed by GitHub
parent 99ef2b7799
commit ccb16dc071
2 changed files with 29 additions and 2 deletions

View File

@@ -6,7 +6,7 @@ use crate::{
tree::TreeEvent,
};
use futures::{Stream, StreamExt};
use reth_beacon_consensus::BeaconEngineMessage;
use reth_beacon_consensus::{BeaconConsensusEngineEvent, BeaconEngineMessage};
use reth_engine_primitives::EngineTypes;
use reth_primitives::{SealedBlockWithSenders, B256};
use std::{
@@ -185,10 +185,19 @@ where
/// Events emitted by the engine API handler.
#[derive(Debug)]
pub enum EngineApiEvent {
/// Event from the consensus engine.
// TODO(mattsse): find a more appropriate name for this variant, consider phasing it out.
BeaconConsensus(BeaconConsensusEngineEvent),
/// Bubbled from tree.
FromTree(TreeEvent),
}
impl From<BeaconConsensusEngineEvent> for EngineApiEvent {
fn from(event: BeaconConsensusEngineEvent) -> Self {
Self::BeaconConsensus(event)
}
}
#[derive(Debug)]
pub enum FromEngine<Req> {
/// Event from the top level orchestrator.

View File

@@ -6,7 +6,8 @@ use crate::{
};
pub use memory_overlay::MemoryOverlayStateProvider;
use reth_beacon_consensus::{
BeaconEngineMessage, ForkchoiceStateTracker, InvalidHeaderCache, OnForkChoiceUpdated,
BeaconConsensusEngineEvent, BeaconEngineMessage, ForkchoiceStateTracker, InvalidHeaderCache,
OnForkChoiceUpdated,
};
use reth_blockchain_tree::{
error::InsertBlockErrorKind, BlockAttachment, BlockBuffer, BlockStatus,
@@ -375,6 +376,15 @@ where
FromEngine::Request(request) => match request {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
let output = self.on_forkchoice_updated(state, payload_attrs);
if let Ok(res) = &output {
// emit an event about the handled FCU
self.emit_event(BeaconConsensusEngineEvent::ForkchoiceUpdated(
state,
res.outcome.forkchoice_status(),
));
}
if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(Into::into)) {
error!("Failed to send event: {err:?}");
}
@@ -429,6 +439,14 @@ where
}
}
/// Emits an outgoing event to the engine.
fn emit_event(&self, event: impl Into<EngineApiEvent>) {
let _ = self
.outgoing
.send(event.into())
.inspect_err(|err| error!("Failed to send internal event: {err:?}"));
}
/// Returns true if the canonical chain length minus the last persisted
/// block is greater than or equal to the persistence threshold.
fn should_persist(&self) -> bool {