feat(cli): consensus engine events (#2473)

This commit is contained in:
Roman Krasiuk
2023-05-01 18:46:03 +03:00
committed by GitHub
parent 36004c6761
commit 7f764028d2
13 changed files with 152 additions and 81 deletions

2
Cargo.lock generated
View File

@@ -5147,6 +5147,8 @@ dependencies = [
"test-fuzz",
"thiserror",
"tiny-keccak",
"tokio",
"tokio-stream",
"triehash",
"url",
]

View File

@@ -1,6 +1,7 @@
//! Support for handling events emitted by node components.
use futures::Stream;
use reth_beacon_consensus::BeaconConsensusEngineEvent;
use reth_network::{NetworkEvent, NetworkHandle};
use reth_network_api::PeersInfo;
use reth_primitives::BlockNumber;
@@ -71,6 +72,20 @@ impl NodeState {
_ => (),
}
}
fn handle_consensus_engine_event(&self, event: BeaconConsensusEngineEvent) {
match event {
BeaconConsensusEngineEvent::ForkchoiceUpdated(state) => {
info!(target: "reth::cli", ?state, "Forkchoice updated");
}
BeaconConsensusEngineEvent::CanonicalBlockAdded(number, hash) => {
info!(target: "reth::cli", number, ?hash, "Block added to canonical chain");
}
BeaconConsensusEngineEvent::ForkBlockAdded(number, hash) => {
info!(target: "reth::cli", number, ?hash, "Block added to fork chain");
}
}
}
}
/// A node event.
@@ -80,19 +95,28 @@ pub enum NodeEvent {
Network(NetworkEvent),
/// A sync pipeline event.
Pipeline(PipelineEvent),
/// A consensus engine event.
ConsensusEngine(BeaconConsensusEngineEvent),
}
impl From<NetworkEvent> for NodeEvent {
fn from(evt: NetworkEvent) -> NodeEvent {
NodeEvent::Network(evt)
fn from(event: NetworkEvent) -> NodeEvent {
NodeEvent::Network(event)
}
}
impl From<PipelineEvent> for NodeEvent {
fn from(evt: PipelineEvent) -> NodeEvent {
NodeEvent::Pipeline(evt)
fn from(event: PipelineEvent) -> NodeEvent {
NodeEvent::Pipeline(event)
}
}
impl From<BeaconConsensusEngineEvent> for NodeEvent {
fn from(event: BeaconConsensusEngineEvent) -> Self {
NodeEvent::ConsensusEngine(event)
}
}
/// Displays relevant information to the user from components of the node, and periodically
/// displays the high-level status of the node.
pub async fn handle_events(
@@ -144,6 +168,9 @@ where
NodeEvent::Pipeline(event) => {
this.state.handle_pipeline_event(event);
}
NodeEvent::ConsensusEngine(event) => {
this.state.handle_consensus_engine_event(event);
}
}
}

View File

@@ -312,13 +312,6 @@ impl Command {
.await?
};
let events = stream_select(
network.event_listener().map(Into::into),
pipeline.events().map(Into::into),
);
ctx.task_executor
.spawn_critical("events task", events::handle_events(Some(network.clone()), events));
// configure the payload builder
let payload_generator = BasicPayloadJobGenerator::new(
blockchain_db.clone(),
@@ -333,6 +326,7 @@ impl Command {
debug!(target: "reth::cli", "Spawning payload builder service");
ctx.task_executor.spawn_critical("payload builder service", payload_service);
let pipeline_events = pipeline.events();
let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel(
Arc::clone(&db),
ctx.task_executor.clone(),
@@ -346,6 +340,16 @@ impl Command {
);
info!(target: "reth::cli", "Consensus engine initialized");
let events = stream_select(
stream_select(
network.event_listener().map(Into::into),
beacon_engine_handle.event_listener().map(Into::into),
),
pipeline_events.map(Into::into),
);
ctx.task_executor
.spawn_critical("events task", events::handle_events(Some(network.clone()), events));
let engine_api = EngineApi::new(
blockchain_db.clone(),
self.chain.clone(),

View File

@@ -0,0 +1,13 @@
use reth_interfaces::consensus::ForkchoiceState;
use reth_primitives::{BlockHash, BlockNumber};
/// Events emitted by [crate::BeaconConsensusEngine].
#[derive(Clone, Debug)]
pub enum BeaconConsensusEngineEvent {
/// The fork choice state was updated.
ForkchoiceUpdated(ForkchoiceState),
/// A block was added to the canonical chain.
CanonicalBlockAdded(BlockNumber, BlockHash),
/// A block was added to the fork chain.
ForkBlockAdded(BlockNumber, BlockHash),
}

View File

@@ -1,4 +1,4 @@
use crate::BeaconEngineResult;
use crate::{BeaconConsensusEngineEvent, BeaconEngineResult};
use futures::{future::Either, FutureExt};
use reth_interfaces::consensus::ForkchoiceState;
use reth_payload_builder::error::PayloadBuilderError;
@@ -11,7 +11,7 @@ use std::{
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::oneshot;
use tokio::sync::{mpsc::UnboundedSender, oneshot};
/// Represents the outcome of forkchoice update.
///
@@ -131,4 +131,6 @@ pub enum BeaconEngineMessage {
/// The sender for returning forkchoice updated result.
tx: oneshot::Sender<OnForkChoiceUpdated>,
},
/// Add a new listener for [`BeaconEngineMessage`].
EventListener(UnboundedSender<BeaconConsensusEngineEvent>),
}

View File

@@ -1,4 +1,4 @@
use crate::engine::metrics::Metrics;
use crate::engine::{message::OnForkChoiceUpdated, metrics::Metrics};
use futures::{Future, FutureExt, StreamExt, TryFutureExt};
use reth_db::{database::Database, tables, transaction::DbTx};
use reth_interfaces::{
@@ -9,7 +9,7 @@ use reth_interfaces::{
Error,
};
use reth_payload_builder::{PayloadBuilderAttributes, PayloadBuilderHandle};
use reth_primitives::{BlockNumber, Header, SealedBlock, H256};
use reth_primitives::{listener::EventListeners, BlockNumber, Header, SealedBlock, H256};
use reth_rpc_types::engine::{
ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, PayloadStatusEnum,
};
@@ -35,10 +35,13 @@ mod error;
pub use error::{BeaconEngineError, BeaconEngineResult, BeaconForkChoiceUpdateError};
mod metrics;
mod pipeline_state;
use crate::engine::message::OnForkChoiceUpdated;
pub use pipeline_state::PipelineState;
mod event;
pub use event::BeaconConsensusEngineEvent;
/// A _shareable_ beacon consensus frontend. Used to interact with the spawned beacon consensus
/// engine.
///
@@ -98,6 +101,13 @@ impl BeaconConsensusEngineHandle {
});
rx
}
/// Creates a new [`BeaconConsensusEngineEvent`] listener stream.
pub fn event_listener(&self) -> UnboundedReceiverStream<BeaconConsensusEngineEvent> {
let (tx, rx) = mpsc::unbounded_channel();
let _ = self.to_engine.send(BeaconEngineMessage::EventListener(tx));
UnboundedReceiverStream::new(rx)
}
}
/// The beacon consensus engine is the driver that switches between historical and live sync.
@@ -151,6 +161,8 @@ where
continuous: bool,
/// The payload store.
payload_builder: PayloadBuilderHandle,
/// Listeners for engine events.
listeners: EventListeners<BeaconConsensusEngineEvent>,
/// Consensus engine metrics.
metrics: Metrics,
}
@@ -213,6 +225,7 @@ where
max_block,
continuous,
payload_builder,
listeners: EventListeners::default(),
metrics: Metrics::default(),
};
@@ -316,6 +329,7 @@ where
PayloadStatus::from_status(PayloadStatusEnum::Syncing)
};
self.listeners.notify(BeaconConsensusEngineEvent::ForkchoiceUpdated(state));
trace!(target: "consensus::engine", ?state, ?status, "Returning forkchoice status");
Ok(OnForkChoiceUpdated::valid(status))
}
@@ -387,14 +401,25 @@ where
};
let status = if self.is_pipeline_idle() {
let block_hash = block.hash;
match self.blockchain_tree.insert_block_without_senders(block) {
Ok(status) => {
let latest_valid_hash =
matches!(status, BlockStatus::Valid).then_some(block_hash);
let mut latest_valid_hash = None;
let status = match status {
BlockStatus::Valid => PayloadStatusEnum::Valid,
BlockStatus::Accepted => PayloadStatusEnum::Accepted,
BlockStatus::Valid => {
latest_valid_hash = Some(block_hash);
self.listeners.notify(BeaconConsensusEngineEvent::CanonicalBlockAdded(
block_number,
block_hash,
));
PayloadStatusEnum::Valid
}
BlockStatus::Accepted => {
self.listeners.notify(BeaconConsensusEngineEvent::ForkBlockAdded(
block_number,
block_hash,
));
PayloadStatusEnum::Accepted
}
BlockStatus::Disconnected => PayloadStatusEnum::Syncing,
};
PayloadStatus::new(status, latest_valid_hash)
@@ -553,6 +578,9 @@ where
let status = this.on_new_payload(payload);
let _ = tx.send(Ok(status));
}
BeaconEngineMessage::EventListener(tx) => {
this.listeners.push_listener(tx);
}
}
}

View File

@@ -40,7 +40,7 @@ use reth_eth_wire::{
};
use reth_net_common::bandwidth_meter::BandwidthMeter;
use reth_network_api::ReputationChangeKind;
use reth_primitives::{NodeRecord, PeerId, H256};
use reth_primitives::{listener::EventListeners, NodeRecord, PeerId, H256};
use reth_provider::BlockProvider;
use reth_rpc_types::{EthProtocolInfo, NetworkStatus};
use std::{
@@ -94,7 +94,7 @@ pub struct NetworkManager<C> {
/// Handles block imports according to the `eth` protocol.
block_import: Box<dyn BlockImport>,
/// All listeners for high level network events.
event_listeners: NetworkEventListeners,
event_listeners: EventListeners<NetworkEvent>,
/// Sender half to send events to the
/// [`TransactionsManager`](crate::transactions::TransactionsManager) task, if configured.
to_transactions_manager: Option<mpsc::UnboundedSender<NetworkTransactionEvent>>,
@@ -494,7 +494,7 @@ where
fn on_handle_message(&mut self, msg: NetworkHandleMessage) {
match msg {
NetworkHandleMessage::EventListener(tx) => {
self.event_listeners.listeners.push(tx);
self.event_listeners.push_listener(tx);
}
NetworkHandleMessage::AnnounceBlock(block, hash) => {
if self.handle.mode().is_stake() {
@@ -669,7 +669,7 @@ where
.peers_mut()
.on_incoming_session_established(peer_id, remote_addr);
}
this.event_listeners.send(NetworkEvent::SessionEstablished {
this.event_listeners.notify(NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
@@ -681,14 +681,14 @@ where
}
SwarmEvent::PeerAdded(peer_id) => {
trace!(target: "net", ?peer_id, "Peer added");
this.event_listeners.send(NetworkEvent::PeerAdded(peer_id));
this.event_listeners.notify(NetworkEvent::PeerAdded(peer_id));
this.metrics
.tracked_peers
.set(this.swarm.state().peers().num_known_peers() as f64);
}
SwarmEvent::PeerRemoved(peer_id) => {
trace!(target: "net", ?peer_id, "Peer dropped");
this.event_listeners.send(NetworkEvent::PeerRemoved(peer_id));
this.event_listeners.notify(NetworkEvent::PeerRemoved(peer_id));
this.metrics
.tracked_peers
.set(this.swarm.state().peers().num_known_peers() as f64);
@@ -739,7 +739,7 @@ where
as f64,
);
this.event_listeners
.send(NetworkEvent::SessionClosed { peer_id, reason });
.notify(NetworkEvent::SessionClosed { peer_id, reason });
}
SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => {
warn!(
@@ -899,27 +899,3 @@ pub enum NetworkEvent {
/// Event emitted when a new peer is removed
PeerRemoved(PeerId),
}
/// Bundles all listeners for [`NetworkEvent`]s.
#[derive(Default)]
struct NetworkEventListeners {
/// All listeners for an event
listeners: Vec<mpsc::UnboundedSender<NetworkEvent>>,
}
// === impl NetworkEventListeners ===
impl NetworkEventListeners {
/// Sends the event to all listeners.
///
/// Remove channels that got closed.
fn send(&mut self, event: NetworkEvent) {
self.listeners.retain(|listener| {
let open = listener.send(event.clone()).is_ok();
if !open {
trace!(target : "net", "event listener channel closed",);
}
open
});
}
}

View File

@@ -36,6 +36,10 @@ secp256k1 = { version = "0.27.0", default-features = false, features = [
# used for forkid
crc = "3"
# tokio
tokio = { version = "1", default-features = false, features = ["sync"] }
tokio-stream = "0.1"
# misc
bytes = "1.4"
serde = "1.0"

View File

@@ -26,6 +26,7 @@ mod hardfork;
mod header;
mod hex_bytes;
mod integer_list;
pub mod listener;
mod log;
mod net;
mod peer;

View File

@@ -0,0 +1,36 @@
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
/// A collection of event listeners for a task.
#[derive(Clone, Debug)]
pub struct EventListeners<T> {
/// All listeners for events
listeners: Vec<mpsc::UnboundedSender<T>>,
}
impl<T> Default for EventListeners<T> {
fn default() -> Self {
Self { listeners: Vec::new() }
}
}
impl<T: Clone> EventListeners<T> {
/// Send an event to all listeners.
///
/// Channels that were closed are removed.
pub fn notify(&mut self, event: T) {
self.listeners.retain(|listener| listener.send(event.clone()).is_ok())
}
/// Add a new event listener.
pub fn new_listener(&mut self) -> UnboundedReceiverStream<T> {
let (sender, receiver) = mpsc::unbounded_channel();
self.listeners.push(sender);
UnboundedReceiverStream::new(receiver)
}
/// Push new event listener.
pub fn push_listener(&mut self, listener: mpsc::UnboundedSender<T>) {
self.listeners.push(listener);
}
}

View File

@@ -0,0 +1,4 @@
//! Event listeners.
mod event_listeners;
pub use event_listeners::EventListeners;

View File

@@ -3,8 +3,6 @@ use crate::{
stage::{ExecOutput, UnwindInput, UnwindOutput},
};
use reth_primitives::BlockNumber;
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
/// An event emitted by a [Pipeline][crate::Pipeline].
///
@@ -58,27 +56,3 @@ pub enum PipelineEvent {
stage_id: StageId,
},
}
/// Bundles all listeners for [`PipelineEvent`]s
// TODO: Make this a generic utility since the same struct exists in `reth/crates/net/network/src/manager.rs` and sort of in `https://github.com/paradigmxyz/reth/blob/01cb6c07df3205ee2bb55853d39302a7dfefc912/crates/net/discv4/src/lib.rs#L662-L671`
#[derive(Default, Clone, Debug)]
pub(crate) struct PipelineEventListeners {
/// All listeners for events
listeners: Vec<mpsc::UnboundedSender<PipelineEvent>>,
}
impl PipelineEventListeners {
/// Send an event to all listeners.
///
/// Channels that were closed are removed.
pub(crate) fn notify(&mut self, event: PipelineEvent) {
self.listeners.retain(|listener| listener.send(event.clone()).is_ok())
}
/// Add a new event listener.
pub(crate) fn new_listener(&mut self) -> UnboundedReceiverStream<PipelineEvent> {
let (sender, receiver) = mpsc::unbounded_channel();
self.listeners.push(sender);
UnboundedReceiverStream::new(receiver)
}
}

View File

@@ -2,7 +2,7 @@ use crate::{error::*, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindI
use futures_util::Future;
use reth_db::database::Database;
use reth_interfaces::sync::{SyncState, SyncStateUpdater};
use reth_primitives::{BlockNumber, H256};
use reth_primitives::{listener::EventListeners, BlockNumber, H256};
use reth_provider::Transaction;
use std::{
fmt::{Debug, Formatter},
@@ -82,7 +82,7 @@ pub struct Pipeline<DB: Database, U: SyncStateUpdater> {
stages: Vec<BoxedStage<DB>>,
max_block: Option<BlockNumber>,
continuous: bool,
listeners: PipelineEventListeners,
listeners: EventListeners<PipelineEvent>,
sync_state_updater: Option<U>,
progress: PipelineProgress,
tip_tx: Option<watch::Sender<H256>>,
@@ -102,7 +102,7 @@ impl<DB: Database, U: SyncStateUpdater> Default for Pipeline<DB, U> {
stages: Vec::new(),
max_block: None,
continuous: false,
listeners: PipelineEventListeners::default(),
listeners: EventListeners::default(),
sync_state_updater: None,
progress: PipelineProgress::default(),
tip_tx: None,