refactor: move consensus layer events to launch context (#17117)

This commit is contained in:
Matthias Seitz
2025-06-28 01:19:05 +02:00
committed by GitHub
parent bfd745117b
commit 31d0bb1d58
2 changed files with 29 additions and 10 deletions

View File

@@ -65,6 +65,9 @@ use tokio::sync::{
oneshot, watch,
};
use futures::{future::Either, stream, Stream, StreamExt};
use reth_node_events::{cl::ConsensusLayerHealthEvents, node::NodeEvent};
/// Reusable setup for launching a node.
///
/// This provides commonly used boilerplate for launching a node.
@@ -972,6 +975,29 @@ where
|| node_config.datadir().data_dir().join("era").into(),
)
}
/// Creates consensus layer health events stream based on node configuration.
///
/// Returns a stream that monitors consensus layer health if:
/// - No debug tip is configured
/// - Not running in dev mode
///
/// Otherwise returns an empty stream.
pub fn consensus_layer_events(
&self,
) -> impl Stream<Item = NodeEvent<PrimitivesTy<T::Types>>> + 'static
where
T::Provider: reth_provider::CanonChainTracker,
{
if self.node_config().debug.tip.is_none() && !self.is_dev() {
Either::Left(
ConsensusLayerHealthEvents::new(Box::new(self.blockchain_db().clone()))
.map(Into::into),
)
} else {
Either::Right(stream::empty())
}
}
}
impl<T, CB>

View File

@@ -9,7 +9,7 @@ use crate::{
NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
};
use alloy_consensus::BlockHeader;
use futures::{future::Either, stream, stream_select, StreamExt};
use futures::{stream_select, StreamExt};
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_db_api::{database_metrics::DatabaseMetrics, Database};
use reth_engine_local::{LocalMiner, LocalPayloadAttributesBuilder};
@@ -31,7 +31,7 @@ use reth_node_core::{
exit::NodeExitFuture,
primitives::Head,
};
use reth_node_events::{cl::ConsensusLayerHealthEvents, node};
use reth_node_events::node;
use reth_provider::{
providers::{BlockchainProvider, NodeTypesForProvider},
BlockNumReader,
@@ -249,14 +249,7 @@ where
let events = stream_select!(
event_sender.new_listener().map(Into::into),
pipeline_events.map(Into::into),
if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() {
Either::Left(
ConsensusLayerHealthEvents::new(Box::new(ctx.blockchain_db().clone()))
.map(Into::into),
)
} else {
Either::Right(stream::empty())
},
ctx.consensus_layer_events(),
pruner_events.map(Into::into),
static_file_producer_events.map(Into::into),
);