diff --git a/bin/reth/src/chain/import.rs b/bin/reth/src/chain/import.rs index f5008f2da9..2173d953dd 100644 --- a/bin/reth/src/chain/import.rs +++ b/bin/reth/src/chain/import.rs @@ -1,6 +1,6 @@ use crate::{ dirs::{ConfigPath, DbPath, PlatformPath}, - node::{handle_events, NodeEvent}, + node::events::{handle_events, NodeEvent}, }; use clap::{crate_version, Parser}; use eyre::Context; @@ -109,7 +109,7 @@ impl ImportCommand { let (mut pipeline, events) = self.build_import_pipeline(config, db.clone(), &consensus, file_client).await?; - tokio::spawn(handle_events(events)); + tokio::spawn(handle_events(None, events)); // Run pipeline info!(target: "reth::cli", "Starting sync pipeline"); diff --git a/bin/reth/src/node/events.rs b/bin/reth/src/node/events.rs new file mode 100644 index 0000000000..d74ecea5b2 --- /dev/null +++ b/bin/reth/src/node/events.rs @@ -0,0 +1,118 @@ +//! Support for handling events emitted by node components. + +use futures::{Stream, StreamExt}; +use reth_network::{NetworkEvent, NetworkHandle}; +use reth_network_api::PeersInfo; +use reth_primitives::BlockNumber; +use reth_stages::{PipelineEvent, StageId}; +use std::time::Duration; +use tracing::{info, warn}; + +/// The current high-level state of the node. +struct NodeState { + /// Connection to the network + network: Option, + /// The stage currently being executed. + current_stage: Option, + /// The current checkpoint of the executing stage. + current_checkpoint: BlockNumber, +} + +impl NodeState { + fn new(network: Option) -> Self { + Self { network, current_stage: None, current_checkpoint: 0 } + } + + fn num_connected_peers(&self) -> usize { + self.network.as_ref().map(|net| net.num_connected_peers()).unwrap_or_default() + } + + /// Processes an event emitted by the pipeline + async fn handle_pipeline_event(&mut self, event: PipelineEvent) { + match event { + PipelineEvent::Running { stage_id, stage_progress } => { + let notable = self.current_stage.is_none(); + self.current_stage = Some(stage_id); + self.current_checkpoint = stage_progress.unwrap_or_default(); + + if notable { + info!(target: "reth::cli", stage = %stage_id, from = stage_progress, "Executing stage"); + } + } + PipelineEvent::Ran { stage_id, result } => { + let notable = result.stage_progress > self.current_checkpoint; + self.current_checkpoint = result.stage_progress; + if result.done { + self.current_stage = None; + info!(target: "reth::cli", stage = %stage_id, checkpoint = result.stage_progress, "Stage finished executing"); + } else if notable { + info!(target: "reth::cli", stage = %stage_id, checkpoint = result.stage_progress, "Stage committed progress"); + } + } + _ => (), + } + } + + async fn handle_network_event(&mut self, event: NetworkEvent) { + match event { + NetworkEvent::SessionEstablished { peer_id, status, .. } => { + info!(target: "reth::cli", connected_peers = self.num_connected_peers(), peer_id = %peer_id, best_block = %status.blockhash, "Peer connected"); + } + NetworkEvent::SessionClosed { peer_id, reason } => { + let reason = reason.map(|s| s.to_string()).unwrap_or_else(|| "None".to_string()); + warn!(target: "reth::cli", connected_peers = self.num_connected_peers(), peer_id = %peer_id, %reason, "Peer disconnected."); + } + _ => (), + } + } +} + +/// A node event. +pub enum NodeEvent { + /// A network event. + Network(NetworkEvent), + /// A sync pipeline event. + Pipeline(PipelineEvent), +} + +impl From for NodeEvent { + fn from(evt: NetworkEvent) -> NodeEvent { + NodeEvent::Network(evt) + } +} + +impl From for NodeEvent { + fn from(evt: PipelineEvent) -> NodeEvent { + NodeEvent::Pipeline(evt) + } +} + +/// Displays relevant information to the user from components of the node, and periodically +/// displays the high-level status of the node. +pub async fn handle_events( + network: Option, + mut events: impl Stream + Unpin, +) { + let mut state = NodeState::new(network); + + let mut interval = tokio::time::interval(Duration::from_secs(30)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + loop { + tokio::select! { + Some(event) = events.next() => { + match event { + NodeEvent::Network(event) => { + state.handle_network_event(event).await; + }, + NodeEvent::Pipeline(event) => { + state.handle_pipeline_event(event).await; + } + } + }, + _ = interval.tick() => { + let stage = state.current_stage.map(|id| id.to_string()).unwrap_or_else(|| "None".to_string()); + info!(target: "reth::cli", connected_peers = state.num_connected_peers(), %stage, checkpoint = state.current_checkpoint, "Status"); + } + } + } +} diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index d2e9e3131c..1c24c6ecf1 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -9,6 +9,7 @@ use crate::{ utils::get_single_header, }; use clap::{crate_version, Parser}; +use events::NodeEvent; use eyre::Context; use fdlimit::raise_fd_limit; use futures::{pin_mut, stream::select as stream_select, Stream, StreamExt}; @@ -32,10 +33,10 @@ use reth_interfaces::{ sync::SyncStateUpdater, }; use reth_network::{ - error::NetworkError, FetchClient, NetworkConfig, NetworkEvent, NetworkHandle, NetworkManager, + error::NetworkError, FetchClient, NetworkConfig, NetworkHandle, NetworkManager, }; use reth_network_api::NetworkInfo; -use reth_primitives::{BlockHashOrNumber, BlockNumber, ChainSpec, Head, H256}; +use reth_primitives::{BlockHashOrNumber, ChainSpec, Head, H256}; use reth_provider::{BlockProvider, HeaderProvider, ShareableDatabase}; use reth_rpc_engine_api::{EngineApi, EngineApiHandle}; use reth_staged_sync::{ @@ -51,10 +52,12 @@ use reth_stages::{ stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage, FINISH}, }; use reth_tasks::TaskExecutor; -use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; +use std::{net::SocketAddr, path::PathBuf, sync::Arc}; use tokio::sync::{mpsc::unbounded_channel, watch}; use tracing::*; +pub mod events; + /// Start the node #[derive(Debug, Parser)] pub struct Command { @@ -181,7 +184,7 @@ impl Command { ) .await?; - ctx.task_executor.spawn(handle_events(events)); + ctx.task_executor.spawn(events::handle_events(Some(network.clone()), events)); // Run pipeline let (rx, tx) = tokio::sync::oneshot::channel(); @@ -477,106 +480,6 @@ async fn run_network_until_shutdown( } } -/// The current high-level state of the node. -#[derive(Default)] -struct NodeState { - /// The number of connected peers. - connected_peers: usize, - /// The stage currently being executed. - current_stage: Option, - /// The current checkpoint of the executing stage. - current_checkpoint: BlockNumber, -} - -impl NodeState { - async fn handle_pipeline_event(&mut self, event: PipelineEvent) { - match event { - PipelineEvent::Running { stage_id, stage_progress } => { - let notable = self.current_stage.is_none(); - self.current_stage = Some(stage_id); - self.current_checkpoint = stage_progress.unwrap_or_default(); - - if notable { - info!(target: "reth::cli", stage = %stage_id, from = stage_progress, "Executing stage"); - } - } - PipelineEvent::Ran { stage_id, result } => { - let notable = result.stage_progress > self.current_checkpoint; - self.current_checkpoint = result.stage_progress; - if result.done { - self.current_stage = None; - info!(target: "reth::cli", stage = %stage_id, checkpoint = result.stage_progress, "Stage finished executing"); - } else if notable { - info!(target: "reth::cli", stage = %stage_id, checkpoint = result.stage_progress, "Stage committed progress"); - } - } - _ => (), - } - } - - async fn handle_network_event(&mut self, event: NetworkEvent) { - match event { - NetworkEvent::SessionEstablished { peer_id, status, .. } => { - self.connected_peers += 1; - info!(target: "reth::cli", connected_peers = self.connected_peers, peer_id = %peer_id, best_block = %status.blockhash, "Peer connected"); - } - NetworkEvent::SessionClosed { peer_id, reason } => { - self.connected_peers -= 1; - let reason = reason.map(|s| s.to_string()).unwrap_or_else(|| "None".to_string()); - warn!(target: "reth::cli", connected_peers = self.connected_peers, peer_id = %peer_id, %reason, "Peer disconnected."); - } - _ => (), - } - } -} - -/// A node event. -pub enum NodeEvent { - /// A network event. - Network(NetworkEvent), - /// A sync pipeline event. - Pipeline(PipelineEvent), -} - -impl From for NodeEvent { - fn from(evt: NetworkEvent) -> NodeEvent { - NodeEvent::Network(evt) - } -} - -impl From for NodeEvent { - fn from(evt: PipelineEvent) -> NodeEvent { - NodeEvent::Pipeline(evt) - } -} - -/// Displays relevant information to the user from components of the node, and periodically -/// displays the high-level status of the node. -pub async fn handle_events(mut events: impl Stream + Unpin) { - let mut state = NodeState::default(); - - let mut interval = tokio::time::interval(Duration::from_secs(30)); - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - loop { - tokio::select! { - Some(event) = events.next() => { - match event { - NodeEvent::Network(event) => { - state.handle_network_event(event).await; - }, - NodeEvent::Pipeline(event) => { - state.handle_pipeline_event(event).await; - } - } - }, - _ = interval.tick() => { - let stage = state.current_stage.map(|id| id.to_string()).unwrap_or_else(|| "None".to_string()); - info!(target: "reth::cli", connected_peers = state.connected_peers, %stage, checkpoint = state.current_checkpoint, "Status"); - } - } - } -} - #[cfg(test)] mod tests { use super::*;