From 7f764028d22ab39e732e9a85d52c698d680f84da Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Mon, 1 May 2023 18:46:03 +0300 Subject: [PATCH] feat(cli): consensus engine events (#2473) --- Cargo.lock | 2 + bin/reth/src/node/events.rs | 35 +++++++++++++-- bin/reth/src/node/mod.rs | 18 +++++--- crates/consensus/beacon/src/engine/event.rs | 13 ++++++ crates/consensus/beacon/src/engine/message.rs | 6 ++- crates/consensus/beacon/src/engine/mod.rs | 44 +++++++++++++++---- crates/net/network/src/manager.rs | 38 +++------------- crates/primitives/Cargo.toml | 4 ++ crates/primitives/src/lib.rs | 1 + .../src/listener/event_listeners.rs | 36 +++++++++++++++ crates/primitives/src/listener/mod.rs | 4 ++ crates/stages/src/pipeline/event.rs | 26 ----------- crates/stages/src/pipeline/mod.rs | 6 +-- 13 files changed, 152 insertions(+), 81 deletions(-) create mode 100644 crates/consensus/beacon/src/engine/event.rs create mode 100644 crates/primitives/src/listener/event_listeners.rs create mode 100644 crates/primitives/src/listener/mod.rs diff --git a/Cargo.lock b/Cargo.lock index e06a080aa9..d8c2fc5bb4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5147,6 +5147,8 @@ dependencies = [ "test-fuzz", "thiserror", "tiny-keccak", + "tokio", + "tokio-stream", "triehash", "url", ] diff --git a/bin/reth/src/node/events.rs b/bin/reth/src/node/events.rs index b4ef7a84e7..4449a740d2 100644 --- a/bin/reth/src/node/events.rs +++ b/bin/reth/src/node/events.rs @@ -1,6 +1,7 @@ //! Support for handling events emitted by node components. use futures::Stream; +use reth_beacon_consensus::BeaconConsensusEngineEvent; use reth_network::{NetworkEvent, NetworkHandle}; use reth_network_api::PeersInfo; use reth_primitives::BlockNumber; @@ -71,6 +72,20 @@ impl NodeState { _ => (), } } + + fn handle_consensus_engine_event(&self, event: BeaconConsensusEngineEvent) { + match event { + BeaconConsensusEngineEvent::ForkchoiceUpdated(state) => { + info!(target: "reth::cli", ?state, "Forkchoice updated"); + } + BeaconConsensusEngineEvent::CanonicalBlockAdded(number, hash) => { + info!(target: "reth::cli", number, ?hash, "Block added to canonical chain"); + } + BeaconConsensusEngineEvent::ForkBlockAdded(number, hash) => { + info!(target: "reth::cli", number, ?hash, "Block added to fork chain"); + } + } + } } /// A node event. @@ -80,19 +95,28 @@ pub enum NodeEvent { Network(NetworkEvent), /// A sync pipeline event. Pipeline(PipelineEvent), + /// A consensus engine event. + ConsensusEngine(BeaconConsensusEngineEvent), } impl From for NodeEvent { - fn from(evt: NetworkEvent) -> NodeEvent { - NodeEvent::Network(evt) + fn from(event: NetworkEvent) -> NodeEvent { + NodeEvent::Network(event) } } impl From for NodeEvent { - fn from(evt: PipelineEvent) -> NodeEvent { - NodeEvent::Pipeline(evt) + fn from(event: PipelineEvent) -> NodeEvent { + NodeEvent::Pipeline(event) } } + +impl From for NodeEvent { + fn from(event: BeaconConsensusEngineEvent) -> Self { + NodeEvent::ConsensusEngine(event) + } +} + /// Displays relevant information to the user from components of the node, and periodically /// displays the high-level status of the node. pub async fn handle_events( @@ -144,6 +168,9 @@ where NodeEvent::Pipeline(event) => { this.state.handle_pipeline_event(event); } + NodeEvent::ConsensusEngine(event) => { + this.state.handle_consensus_engine_event(event); + } } } diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index da14513e2e..2edb2ca7a5 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -312,13 +312,6 @@ impl Command { .await? }; - let events = stream_select( - network.event_listener().map(Into::into), - pipeline.events().map(Into::into), - ); - ctx.task_executor - .spawn_critical("events task", events::handle_events(Some(network.clone()), events)); - // configure the payload builder let payload_generator = BasicPayloadJobGenerator::new( blockchain_db.clone(), @@ -333,6 +326,7 @@ impl Command { debug!(target: "reth::cli", "Spawning payload builder service"); ctx.task_executor.spawn_critical("payload builder service", payload_service); + let pipeline_events = pipeline.events(); let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel( Arc::clone(&db), ctx.task_executor.clone(), @@ -346,6 +340,16 @@ impl Command { ); info!(target: "reth::cli", "Consensus engine initialized"); + let events = stream_select( + stream_select( + network.event_listener().map(Into::into), + beacon_engine_handle.event_listener().map(Into::into), + ), + pipeline_events.map(Into::into), + ); + ctx.task_executor + .spawn_critical("events task", events::handle_events(Some(network.clone()), events)); + let engine_api = EngineApi::new( blockchain_db.clone(), self.chain.clone(), diff --git a/crates/consensus/beacon/src/engine/event.rs b/crates/consensus/beacon/src/engine/event.rs new file mode 100644 index 0000000000..9959d102b0 --- /dev/null +++ b/crates/consensus/beacon/src/engine/event.rs @@ -0,0 +1,13 @@ +use reth_interfaces::consensus::ForkchoiceState; +use reth_primitives::{BlockHash, BlockNumber}; + +/// Events emitted by [crate::BeaconConsensusEngine]. +#[derive(Clone, Debug)] +pub enum BeaconConsensusEngineEvent { + /// The fork choice state was updated. + ForkchoiceUpdated(ForkchoiceState), + /// A block was added to the canonical chain. + CanonicalBlockAdded(BlockNumber, BlockHash), + /// A block was added to the fork chain. + ForkBlockAdded(BlockNumber, BlockHash), +} diff --git a/crates/consensus/beacon/src/engine/message.rs b/crates/consensus/beacon/src/engine/message.rs index f3eff843fd..fabc7c789e 100644 --- a/crates/consensus/beacon/src/engine/message.rs +++ b/crates/consensus/beacon/src/engine/message.rs @@ -1,4 +1,4 @@ -use crate::BeaconEngineResult; +use crate::{BeaconConsensusEngineEvent, BeaconEngineResult}; use futures::{future::Either, FutureExt}; use reth_interfaces::consensus::ForkchoiceState; use reth_payload_builder::error::PayloadBuilderError; @@ -11,7 +11,7 @@ use std::{ pin::Pin, task::{ready, Context, Poll}, }; -use tokio::sync::oneshot; +use tokio::sync::{mpsc::UnboundedSender, oneshot}; /// Represents the outcome of forkchoice update. /// @@ -131,4 +131,6 @@ pub enum BeaconEngineMessage { /// The sender for returning forkchoice updated result. tx: oneshot::Sender, }, + /// Add a new listener for [`BeaconEngineMessage`]. + EventListener(UnboundedSender), } diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 780c41ae7b..67762c9380 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -1,4 +1,4 @@ -use crate::engine::metrics::Metrics; +use crate::engine::{message::OnForkChoiceUpdated, metrics::Metrics}; use futures::{Future, FutureExt, StreamExt, TryFutureExt}; use reth_db::{database::Database, tables, transaction::DbTx}; use reth_interfaces::{ @@ -9,7 +9,7 @@ use reth_interfaces::{ Error, }; use reth_payload_builder::{PayloadBuilderAttributes, PayloadBuilderHandle}; -use reth_primitives::{BlockNumber, Header, SealedBlock, H256}; +use reth_primitives::{listener::EventListeners, BlockNumber, Header, SealedBlock, H256}; use reth_rpc_types::engine::{ ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, PayloadStatusEnum, }; @@ -35,10 +35,13 @@ mod error; pub use error::{BeaconEngineError, BeaconEngineResult, BeaconForkChoiceUpdateError}; mod metrics; + mod pipeline_state; -use crate::engine::message::OnForkChoiceUpdated; pub use pipeline_state::PipelineState; +mod event; +pub use event::BeaconConsensusEngineEvent; + /// A _shareable_ beacon consensus frontend. Used to interact with the spawned beacon consensus /// engine. /// @@ -98,6 +101,13 @@ impl BeaconConsensusEngineHandle { }); rx } + + /// Creates a new [`BeaconConsensusEngineEvent`] listener stream. + pub fn event_listener(&self) -> UnboundedReceiverStream { + let (tx, rx) = mpsc::unbounded_channel(); + let _ = self.to_engine.send(BeaconEngineMessage::EventListener(tx)); + UnboundedReceiverStream::new(rx) + } } /// The beacon consensus engine is the driver that switches between historical and live sync. @@ -151,6 +161,8 @@ where continuous: bool, /// The payload store. payload_builder: PayloadBuilderHandle, + /// Listeners for engine events. + listeners: EventListeners, /// Consensus engine metrics. metrics: Metrics, } @@ -213,6 +225,7 @@ where max_block, continuous, payload_builder, + listeners: EventListeners::default(), metrics: Metrics::default(), }; @@ -316,6 +329,7 @@ where PayloadStatus::from_status(PayloadStatusEnum::Syncing) }; + self.listeners.notify(BeaconConsensusEngineEvent::ForkchoiceUpdated(state)); trace!(target: "consensus::engine", ?state, ?status, "Returning forkchoice status"); Ok(OnForkChoiceUpdated::valid(status)) } @@ -387,14 +401,25 @@ where }; let status = if self.is_pipeline_idle() { - let block_hash = block.hash; match self.blockchain_tree.insert_block_without_senders(block) { Ok(status) => { - let latest_valid_hash = - matches!(status, BlockStatus::Valid).then_some(block_hash); + let mut latest_valid_hash = None; let status = match status { - BlockStatus::Valid => PayloadStatusEnum::Valid, - BlockStatus::Accepted => PayloadStatusEnum::Accepted, + BlockStatus::Valid => { + latest_valid_hash = Some(block_hash); + self.listeners.notify(BeaconConsensusEngineEvent::CanonicalBlockAdded( + block_number, + block_hash, + )); + PayloadStatusEnum::Valid + } + BlockStatus::Accepted => { + self.listeners.notify(BeaconConsensusEngineEvent::ForkBlockAdded( + block_number, + block_hash, + )); + PayloadStatusEnum::Accepted + } BlockStatus::Disconnected => PayloadStatusEnum::Syncing, }; PayloadStatus::new(status, latest_valid_hash) @@ -553,6 +578,9 @@ where let status = this.on_new_payload(payload); let _ = tx.send(Ok(status)); } + BeaconEngineMessage::EventListener(tx) => { + this.listeners.push_listener(tx); + } } } diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index af438f4ef1..692693992d 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -40,7 +40,7 @@ use reth_eth_wire::{ }; use reth_net_common::bandwidth_meter::BandwidthMeter; use reth_network_api::ReputationChangeKind; -use reth_primitives::{NodeRecord, PeerId, H256}; +use reth_primitives::{listener::EventListeners, NodeRecord, PeerId, H256}; use reth_provider::BlockProvider; use reth_rpc_types::{EthProtocolInfo, NetworkStatus}; use std::{ @@ -94,7 +94,7 @@ pub struct NetworkManager { /// Handles block imports according to the `eth` protocol. block_import: Box, /// All listeners for high level network events. - event_listeners: NetworkEventListeners, + event_listeners: EventListeners, /// Sender half to send events to the /// [`TransactionsManager`](crate::transactions::TransactionsManager) task, if configured. to_transactions_manager: Option>, @@ -494,7 +494,7 @@ where fn on_handle_message(&mut self, msg: NetworkHandleMessage) { match msg { NetworkHandleMessage::EventListener(tx) => { - self.event_listeners.listeners.push(tx); + self.event_listeners.push_listener(tx); } NetworkHandleMessage::AnnounceBlock(block, hash) => { if self.handle.mode().is_stake() { @@ -669,7 +669,7 @@ where .peers_mut() .on_incoming_session_established(peer_id, remote_addr); } - this.event_listeners.send(NetworkEvent::SessionEstablished { + this.event_listeners.notify(NetworkEvent::SessionEstablished { peer_id, remote_addr, client_version, @@ -681,14 +681,14 @@ where } SwarmEvent::PeerAdded(peer_id) => { trace!(target: "net", ?peer_id, "Peer added"); - this.event_listeners.send(NetworkEvent::PeerAdded(peer_id)); + this.event_listeners.notify(NetworkEvent::PeerAdded(peer_id)); this.metrics .tracked_peers .set(this.swarm.state().peers().num_known_peers() as f64); } SwarmEvent::PeerRemoved(peer_id) => { trace!(target: "net", ?peer_id, "Peer dropped"); - this.event_listeners.send(NetworkEvent::PeerRemoved(peer_id)); + this.event_listeners.notify(NetworkEvent::PeerRemoved(peer_id)); this.metrics .tracked_peers .set(this.swarm.state().peers().num_known_peers() as f64); @@ -739,7 +739,7 @@ where as f64, ); this.event_listeners - .send(NetworkEvent::SessionClosed { peer_id, reason }); + .notify(NetworkEvent::SessionClosed { peer_id, reason }); } SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => { warn!( @@ -899,27 +899,3 @@ pub enum NetworkEvent { /// Event emitted when a new peer is removed PeerRemoved(PeerId), } - -/// Bundles all listeners for [`NetworkEvent`]s. -#[derive(Default)] -struct NetworkEventListeners { - /// All listeners for an event - listeners: Vec>, -} - -// === impl NetworkEventListeners === - -impl NetworkEventListeners { - /// Sends the event to all listeners. - /// - /// Remove channels that got closed. - fn send(&mut self, event: NetworkEvent) { - self.listeners.retain(|listener| { - let open = listener.send(event.clone()).is_ok(); - if !open { - trace!(target : "net", "event listener channel closed",); - } - open - }); - } -} diff --git a/crates/primitives/Cargo.toml b/crates/primitives/Cargo.toml index f4a4eea5ef..e76268fc49 100644 --- a/crates/primitives/Cargo.toml +++ b/crates/primitives/Cargo.toml @@ -36,6 +36,10 @@ secp256k1 = { version = "0.27.0", default-features = false, features = [ # used for forkid crc = "3" +# tokio +tokio = { version = "1", default-features = false, features = ["sync"] } +tokio-stream = "0.1" + # misc bytes = "1.4" serde = "1.0" diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index 62810e2b05..f191698bf1 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -26,6 +26,7 @@ mod hardfork; mod header; mod hex_bytes; mod integer_list; +pub mod listener; mod log; mod net; mod peer; diff --git a/crates/primitives/src/listener/event_listeners.rs b/crates/primitives/src/listener/event_listeners.rs new file mode 100644 index 0000000000..d99793fb3d --- /dev/null +++ b/crates/primitives/src/listener/event_listeners.rs @@ -0,0 +1,36 @@ +use tokio::sync::mpsc; +use tokio_stream::wrappers::UnboundedReceiverStream; + +/// A collection of event listeners for a task. +#[derive(Clone, Debug)] +pub struct EventListeners { + /// All listeners for events + listeners: Vec>, +} + +impl Default for EventListeners { + fn default() -> Self { + Self { listeners: Vec::new() } + } +} + +impl EventListeners { + /// Send an event to all listeners. + /// + /// Channels that were closed are removed. + pub fn notify(&mut self, event: T) { + self.listeners.retain(|listener| listener.send(event.clone()).is_ok()) + } + + /// Add a new event listener. + pub fn new_listener(&mut self) -> UnboundedReceiverStream { + let (sender, receiver) = mpsc::unbounded_channel(); + self.listeners.push(sender); + UnboundedReceiverStream::new(receiver) + } + + /// Push new event listener. + pub fn push_listener(&mut self, listener: mpsc::UnboundedSender) { + self.listeners.push(listener); + } +} diff --git a/crates/primitives/src/listener/mod.rs b/crates/primitives/src/listener/mod.rs new file mode 100644 index 0000000000..553d18d678 --- /dev/null +++ b/crates/primitives/src/listener/mod.rs @@ -0,0 +1,4 @@ +//! Event listeners. + +mod event_listeners; +pub use event_listeners::EventListeners; diff --git a/crates/stages/src/pipeline/event.rs b/crates/stages/src/pipeline/event.rs index fa003f8128..89b24490ee 100644 --- a/crates/stages/src/pipeline/event.rs +++ b/crates/stages/src/pipeline/event.rs @@ -3,8 +3,6 @@ use crate::{ stage::{ExecOutput, UnwindInput, UnwindOutput}, }; use reth_primitives::BlockNumber; -use tokio::sync::mpsc; -use tokio_stream::wrappers::UnboundedReceiverStream; /// An event emitted by a [Pipeline][crate::Pipeline]. /// @@ -58,27 +56,3 @@ pub enum PipelineEvent { stage_id: StageId, }, } - -/// Bundles all listeners for [`PipelineEvent`]s -// TODO: Make this a generic utility since the same struct exists in `reth/crates/net/network/src/manager.rs` and sort of in `https://github.com/paradigmxyz/reth/blob/01cb6c07df3205ee2bb55853d39302a7dfefc912/crates/net/discv4/src/lib.rs#L662-L671` -#[derive(Default, Clone, Debug)] -pub(crate) struct PipelineEventListeners { - /// All listeners for events - listeners: Vec>, -} - -impl PipelineEventListeners { - /// Send an event to all listeners. - /// - /// Channels that were closed are removed. - pub(crate) fn notify(&mut self, event: PipelineEvent) { - self.listeners.retain(|listener| listener.send(event.clone()).is_ok()) - } - - /// Add a new event listener. - pub(crate) fn new_listener(&mut self) -> UnboundedReceiverStream { - let (sender, receiver) = mpsc::unbounded_channel(); - self.listeners.push(sender); - UnboundedReceiverStream::new(receiver) - } -} diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index e445a3ae0f..d7ef875f34 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -2,7 +2,7 @@ use crate::{error::*, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindI use futures_util::Future; use reth_db::database::Database; use reth_interfaces::sync::{SyncState, SyncStateUpdater}; -use reth_primitives::{BlockNumber, H256}; +use reth_primitives::{listener::EventListeners, BlockNumber, H256}; use reth_provider::Transaction; use std::{ fmt::{Debug, Formatter}, @@ -82,7 +82,7 @@ pub struct Pipeline { stages: Vec>, max_block: Option, continuous: bool, - listeners: PipelineEventListeners, + listeners: EventListeners, sync_state_updater: Option, progress: PipelineProgress, tip_tx: Option>, @@ -102,7 +102,7 @@ impl Default for Pipeline { stages: Vec::new(), max_block: None, continuous: false, - listeners: PipelineEventListeners::default(), + listeners: EventListeners::default(), sync_state_updater: None, progress: PipelineProgress::default(), tip_tx: None,