diff --git a/crates/node/builder/src/launch/common.rs b/crates/node/builder/src/launch/common.rs index 9d514e7184..f7696799e9 100644 --- a/crates/node/builder/src/launch/common.rs +++ b/crates/node/builder/src/launch/common.rs @@ -65,6 +65,9 @@ use tokio::sync::{ oneshot, watch, }; +use futures::{future::Either, stream, Stream, StreamExt}; +use reth_node_events::{cl::ConsensusLayerHealthEvents, node::NodeEvent}; + /// Reusable setup for launching a node. /// /// This provides commonly used boilerplate for launching a node. @@ -972,6 +975,29 @@ where || node_config.datadir().data_dir().join("era").into(), ) } + + /// Creates consensus layer health events stream based on node configuration. + /// + /// Returns a stream that monitors consensus layer health if: + /// - No debug tip is configured + /// - Not running in dev mode + /// + /// Otherwise returns an empty stream. + pub fn consensus_layer_events( + &self, + ) -> impl Stream>> + 'static + where + T::Provider: reth_provider::CanonChainTracker, + { + if self.node_config().debug.tip.is_none() && !self.is_dev() { + Either::Left( + ConsensusLayerHealthEvents::new(Box::new(self.blockchain_db().clone())) + .map(Into::into), + ) + } else { + Either::Right(stream::empty()) + } + } } impl diff --git a/crates/node/builder/src/launch/engine.rs b/crates/node/builder/src/launch/engine.rs index cfb9e5f501..b9eca178ac 100644 --- a/crates/node/builder/src/launch/engine.rs +++ b/crates/node/builder/src/launch/engine.rs @@ -9,7 +9,7 @@ use crate::{ NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter, }; use alloy_consensus::BlockHeader; -use futures::{future::Either, stream, stream_select, StreamExt}; +use futures::{stream_select, StreamExt}; use reth_chainspec::{EthChainSpec, EthereumHardforks}; use reth_db_api::{database_metrics::DatabaseMetrics, Database}; use reth_engine_local::{LocalMiner, LocalPayloadAttributesBuilder}; @@ -31,7 +31,7 @@ use reth_node_core::{ exit::NodeExitFuture, primitives::Head, }; -use reth_node_events::{cl::ConsensusLayerHealthEvents, node}; +use reth_node_events::node; use reth_provider::{ providers::{BlockchainProvider, NodeTypesForProvider}, BlockNumReader, @@ -249,14 +249,7 @@ where let events = stream_select!( event_sender.new_listener().map(Into::into), pipeline_events.map(Into::into), - if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() { - Either::Left( - ConsensusLayerHealthEvents::new(Box::new(ctx.blockchain_db().clone())) - .map(Into::into), - ) - } else { - Either::Right(stream::empty()) - }, + ctx.consensus_layer_events(), pruner_events.map(Into::into), static_file_producer_events.map(Into::into), );