diff --git a/Cargo.lock b/Cargo.lock index dc6fb0b656..04d8ee92c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5069,6 +5069,7 @@ dependencies = [ "reth-metrics", "reth-primitives", "reth-provider", + "reth-stages", "tokio", "tracing", ] diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 7a67b006e4..905bf32d04 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -183,6 +183,11 @@ impl Command { self.init_trusted_nodes(&mut config); + debug!(target: "reth::cli", "Spawning metrics listener task"); + let (metrics_tx, metrics_rx) = unbounded_channel(); + let metrics_listener = MetricsListener::new(metrics_rx); + ctx.task_executor.spawn_critical("metrics listener task", metrics_listener); + // configure blockchain tree let tree_externals = TreeExternals::new( db.clone(), @@ -195,11 +200,14 @@ impl Command { // depth at least N blocks must be sent at once. let (canon_state_notification_sender, _receiver) = tokio::sync::broadcast::channel(tree_config.max_reorg_depth() as usize * 2); - let blockchain_tree = ShareableBlockchainTree::new(BlockchainTree::new( - tree_externals, - canon_state_notification_sender.clone(), - tree_config, - )?); + let blockchain_tree = ShareableBlockchainTree::new( + BlockchainTree::new( + tree_externals, + canon_state_notification_sender.clone(), + tree_config, + )? + .with_sync_metrics_tx(metrics_tx.clone()), + ); // setup the blockchain provider let factory = ProviderFactory::new(Arc::clone(&db), Arc::clone(&self.chain)); @@ -278,11 +286,6 @@ impl Command { debug!(target: "reth::cli", "Spawning payload builder service"); ctx.task_executor.spawn_critical("payload builder service", payload_service); - debug!(target: "reth::cli", "Spawning metrics listener task"); - let (metrics_tx, metrics_rx) = unbounded_channel(); - let metrics_listener = MetricsListener::new(metrics_rx); - ctx.task_executor.spawn_critical("metrics listener task", metrics_listener); - let max_block = if let Some(block) = self.debug.max_block { Some(block) } else if let Some(tip) = self.debug.tip { @@ -687,7 +690,7 @@ impl Command { if continuous { HeaderSyncMode::Continuous } else { HeaderSyncMode::Tip(tip_rx) }; let pipeline = builder .with_tip_sender(tip_tx) - .with_metric_events(metrics_tx) + .with_metrics_tx(metrics_tx) .add_stages( DefaultStages::new( header_mode, diff --git a/crates/blockchain-tree/Cargo.toml b/crates/blockchain-tree/Cargo.toml index 0a4bf95590..f7c0316480 100644 --- a/crates/blockchain-tree/Cargo.toml +++ b/crates/blockchain-tree/Cargo.toml @@ -20,6 +20,7 @@ reth-interfaces = { workspace = true } reth-db = { path = "../storage/db" } reth-metrics = { workspace = true, features = ["common"] } reth-provider = { workspace = true } +reth-stages = { path = "../stages" } # common parking_lot = { version = "0.12" } diff --git a/crates/blockchain-tree/src/blockchain_tree.rs b/crates/blockchain-tree/src/blockchain_tree.rs index ce10bbd907..22d3e32e62 100644 --- a/crates/blockchain-tree/src/blockchain_tree.rs +++ b/crates/blockchain-tree/src/blockchain_tree.rs @@ -26,6 +26,7 @@ use reth_provider::{ CanonStateNotificationSender, CanonStateNotifications, Chain, DatabaseProvider, DisplayBlocksChain, ExecutorFactory, HeaderProvider, }; +use reth_stages::{MetricEvent, MetricEventsSender}; use std::{ collections::{BTreeMap, HashMap}, sync::Arc, @@ -90,6 +91,8 @@ pub struct BlockchainTree { canon_state_notification_sender: CanonStateNotificationSender, /// Metrics for the blockchain tree. metrics: TreeMetrics, + /// Metrics for sync stages. + sync_metrics_tx: Option, } /// A container that wraps chains and block indices to allow searching for block hashes across all @@ -141,9 +144,16 @@ impl BlockchainTree config, canon_state_notification_sender, metrics: Default::default(), + sync_metrics_tx: None, }) } + /// Set the sync metric events sender. + pub fn with_sync_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self { + self.sync_metrics_tx = Some(metrics_tx); + self + } + /// Check if then block is known to blockchain tree or database and return its status. /// /// Function will check: @@ -1074,10 +1084,15 @@ impl BlockchainTree } } - /// Update blockchain tree metrics - pub(crate) fn update_tree_metrics(&self) { + /// Update blockchain tree and sync metrics + pub(crate) fn update_metrics(&mut self) { + let height = self.canonical_chain().tip().number; + self.metrics.sidechains.set(self.chains.len() as f64); - self.metrics.canonical_chain_height.set(self.canonical_chain().tip().number as f64); + self.metrics.canonical_chain_height.set(height as f64); + if let Some(metrics_tx) = self.sync_metrics_tx.as_mut() { + let _ = metrics_tx.send(MetricEvent::SyncHeight { height }); + } } } diff --git a/crates/blockchain-tree/src/shareable.rs b/crates/blockchain-tree/src/shareable.rs index ef0ea0323d..663c2e03ba 100644 --- a/crates/blockchain-tree/src/shareable.rs +++ b/crates/blockchain-tree/src/shareable.rs @@ -44,7 +44,7 @@ impl BlockchainTreeEngine fn buffer_block(&self, block: SealedBlockWithSenders) -> Result<(), InsertBlockError> { let mut tree = self.tree.write(); let res = tree.buffer_block(block); - tree.update_tree_metrics(); + tree.update_metrics(); res } @@ -55,7 +55,7 @@ impl BlockchainTreeEngine trace!(target: "blockchain_tree", hash=?block.hash, number=block.number, parent_hash=?block.parent_hash, "Inserting block"); let mut tree = self.tree.write(); let res = tree.insert_block(block); - tree.update_tree_metrics(); + tree.update_metrics(); res } @@ -63,14 +63,14 @@ impl BlockchainTreeEngine trace!(target: "blockchain_tree", ?finalized_block, "Finalizing block"); let mut tree = self.tree.write(); tree.finalize_block(finalized_block); - tree.update_tree_metrics(); + tree.update_metrics(); } fn restore_canonical_hashes(&self, last_finalized_block: BlockNumber) -> Result<(), Error> { trace!(target: "blockchain_tree", ?last_finalized_block, "Restoring canonical hashes for last finalized block"); let mut tree = self.tree.write(); let res = tree.restore_canonical_hashes(last_finalized_block); - tree.update_tree_metrics(); + tree.update_metrics(); res } @@ -78,7 +78,7 @@ impl BlockchainTreeEngine trace!(target: "blockchain_tree", ?block_hash, "Making block canonical"); let mut tree = self.tree.write(); let res = tree.make_canonical(block_hash); - tree.update_tree_metrics(); + tree.update_metrics(); res } @@ -86,7 +86,7 @@ impl BlockchainTreeEngine trace!(target: "blockchain_tree", ?unwind_to, "Unwinding to block number"); let mut tree = self.tree.write(); let res = tree.unwind(unwind_to); - tree.update_tree_metrics(); + tree.update_metrics(); res } } diff --git a/crates/stages/src/metrics/listener.rs b/crates/stages/src/metrics/listener.rs index f6672a4e68..d05560c76f 100644 --- a/crates/stages/src/metrics/listener.rs +++ b/crates/stages/src/metrics/listener.rs @@ -1,4 +1,4 @@ -use crate::metrics::{StageMetrics, SyncMetrics}; +use crate::metrics::SyncMetrics; use reth_primitives::{ stage::{StageCheckpoint, StageId}, BlockNumber, @@ -16,6 +16,11 @@ pub type MetricEventsSender = UnboundedSender; /// Collection of metric events. #[derive(Clone, Copy, Debug)] pub enum MetricEvent { + /// Sync reached new height. All stage checkpoints are updated. + SyncHeight { + /// Maximum height measured in block number that sync reached. + height: BlockNumber, + }, /// Stage reached new checkpoint. StageCheckpoint { /// Stage ID. @@ -44,10 +49,14 @@ impl MetricsListener { fn handle_event(&mut self, event: MetricEvent) { match event { + MetricEvent::SyncHeight { height } => { + for stage_id in StageId::ALL { + let stage_metrics = self.sync_metrics.get_stage_metrics(stage_id); + stage_metrics.checkpoint.set(height as f64); + } + } MetricEvent::StageCheckpoint { stage_id, checkpoint, max_block_number } => { - let stage_metrics = self.sync_metrics.stages.entry(stage_id).or_insert_with(|| { - StageMetrics::new_with_labels(&[("stage", stage_id.to_string())]) - }); + let stage_metrics = self.sync_metrics.get_stage_metrics(stage_id); stage_metrics.checkpoint.set(checkpoint.block_number as f64); diff --git a/crates/stages/src/metrics/sync_metrics.rs b/crates/stages/src/metrics/sync_metrics.rs index 859a7e6d77..ba440cb2a3 100644 --- a/crates/stages/src/metrics/sync_metrics.rs +++ b/crates/stages/src/metrics/sync_metrics.rs @@ -10,6 +10,15 @@ pub(crate) struct SyncMetrics { pub(crate) stages: HashMap, } +impl SyncMetrics { + /// Returns existing or initializes a new instance of [StageMetrics] for the provided [StageId]. + pub(crate) fn get_stage_metrics(&mut self, stage_id: StageId) -> &mut StageMetrics { + self.stages + .entry(stage_id) + .or_insert_with(|| StageMetrics::new_with_labels(&[("stage", stage_id.to_string())])) + } +} + #[derive(Metrics)] #[metrics(scope = "sync")] pub(crate) struct StageMetrics { diff --git a/crates/stages/src/pipeline/builder.rs b/crates/stages/src/pipeline/builder.rs index 5e72da6b03..7679361c83 100644 --- a/crates/stages/src/pipeline/builder.rs +++ b/crates/stages/src/pipeline/builder.rs @@ -62,7 +62,7 @@ where } /// Set the metric events sender. - pub fn with_metric_events(mut self, metrics_tx: MetricEventsSender) -> Self { + pub fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self { self.metrics_tx = Some(metrics_tx); self }