mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-30 09:38:24 -05:00
feat(stages, tree): update sync metrics from blockchain tree (#3507)
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -5069,6 +5069,7 @@ dependencies = [
|
||||
"reth-metrics",
|
||||
"reth-primitives",
|
||||
"reth-provider",
|
||||
"reth-stages",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
@@ -183,6 +183,11 @@ impl Command {
|
||||
|
||||
self.init_trusted_nodes(&mut config);
|
||||
|
||||
debug!(target: "reth::cli", "Spawning metrics listener task");
|
||||
let (metrics_tx, metrics_rx) = unbounded_channel();
|
||||
let metrics_listener = MetricsListener::new(metrics_rx);
|
||||
ctx.task_executor.spawn_critical("metrics listener task", metrics_listener);
|
||||
|
||||
// configure blockchain tree
|
||||
let tree_externals = TreeExternals::new(
|
||||
db.clone(),
|
||||
@@ -195,11 +200,14 @@ impl Command {
|
||||
// depth at least N blocks must be sent at once.
|
||||
let (canon_state_notification_sender, _receiver) =
|
||||
tokio::sync::broadcast::channel(tree_config.max_reorg_depth() as usize * 2);
|
||||
let blockchain_tree = ShareableBlockchainTree::new(BlockchainTree::new(
|
||||
tree_externals,
|
||||
canon_state_notification_sender.clone(),
|
||||
tree_config,
|
||||
)?);
|
||||
let blockchain_tree = ShareableBlockchainTree::new(
|
||||
BlockchainTree::new(
|
||||
tree_externals,
|
||||
canon_state_notification_sender.clone(),
|
||||
tree_config,
|
||||
)?
|
||||
.with_sync_metrics_tx(metrics_tx.clone()),
|
||||
);
|
||||
|
||||
// setup the blockchain provider
|
||||
let factory = ProviderFactory::new(Arc::clone(&db), Arc::clone(&self.chain));
|
||||
@@ -278,11 +286,6 @@ impl Command {
|
||||
debug!(target: "reth::cli", "Spawning payload builder service");
|
||||
ctx.task_executor.spawn_critical("payload builder service", payload_service);
|
||||
|
||||
debug!(target: "reth::cli", "Spawning metrics listener task");
|
||||
let (metrics_tx, metrics_rx) = unbounded_channel();
|
||||
let metrics_listener = MetricsListener::new(metrics_rx);
|
||||
ctx.task_executor.spawn_critical("metrics listener task", metrics_listener);
|
||||
|
||||
let max_block = if let Some(block) = self.debug.max_block {
|
||||
Some(block)
|
||||
} else if let Some(tip) = self.debug.tip {
|
||||
@@ -687,7 +690,7 @@ impl Command {
|
||||
if continuous { HeaderSyncMode::Continuous } else { HeaderSyncMode::Tip(tip_rx) };
|
||||
let pipeline = builder
|
||||
.with_tip_sender(tip_tx)
|
||||
.with_metric_events(metrics_tx)
|
||||
.with_metrics_tx(metrics_tx)
|
||||
.add_stages(
|
||||
DefaultStages::new(
|
||||
header_mode,
|
||||
|
||||
@@ -20,6 +20,7 @@ reth-interfaces = { workspace = true }
|
||||
reth-db = { path = "../storage/db" }
|
||||
reth-metrics = { workspace = true, features = ["common"] }
|
||||
reth-provider = { workspace = true }
|
||||
reth-stages = { path = "../stages" }
|
||||
|
||||
# common
|
||||
parking_lot = { version = "0.12" }
|
||||
|
||||
@@ -26,6 +26,7 @@ use reth_provider::{
|
||||
CanonStateNotificationSender, CanonStateNotifications, Chain, DatabaseProvider,
|
||||
DisplayBlocksChain, ExecutorFactory, HeaderProvider,
|
||||
};
|
||||
use reth_stages::{MetricEvent, MetricEventsSender};
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap},
|
||||
sync::Arc,
|
||||
@@ -90,6 +91,8 @@ pub struct BlockchainTree<DB: Database, C: Consensus, EF: ExecutorFactory> {
|
||||
canon_state_notification_sender: CanonStateNotificationSender,
|
||||
/// Metrics for the blockchain tree.
|
||||
metrics: TreeMetrics,
|
||||
/// Metrics for sync stages.
|
||||
sync_metrics_tx: Option<MetricEventsSender>,
|
||||
}
|
||||
|
||||
/// A container that wraps chains and block indices to allow searching for block hashes across all
|
||||
@@ -141,9 +144,16 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
|
||||
config,
|
||||
canon_state_notification_sender,
|
||||
metrics: Default::default(),
|
||||
sync_metrics_tx: None,
|
||||
})
|
||||
}
|
||||
|
||||
/// Set the sync metric events sender.
|
||||
pub fn with_sync_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self {
|
||||
self.sync_metrics_tx = Some(metrics_tx);
|
||||
self
|
||||
}
|
||||
|
||||
/// Check if then block is known to blockchain tree or database and return its status.
|
||||
///
|
||||
/// Function will check:
|
||||
@@ -1074,10 +1084,15 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
|
||||
}
|
||||
}
|
||||
|
||||
/// Update blockchain tree metrics
|
||||
pub(crate) fn update_tree_metrics(&self) {
|
||||
/// Update blockchain tree and sync metrics
|
||||
pub(crate) fn update_metrics(&mut self) {
|
||||
let height = self.canonical_chain().tip().number;
|
||||
|
||||
self.metrics.sidechains.set(self.chains.len() as f64);
|
||||
self.metrics.canonical_chain_height.set(self.canonical_chain().tip().number as f64);
|
||||
self.metrics.canonical_chain_height.set(height as f64);
|
||||
if let Some(metrics_tx) = self.sync_metrics_tx.as_mut() {
|
||||
let _ = metrics_tx.send(MetricEvent::SyncHeight { height });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -44,7 +44,7 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTreeEngine
|
||||
fn buffer_block(&self, block: SealedBlockWithSenders) -> Result<(), InsertBlockError> {
|
||||
let mut tree = self.tree.write();
|
||||
let res = tree.buffer_block(block);
|
||||
tree.update_tree_metrics();
|
||||
tree.update_metrics();
|
||||
res
|
||||
}
|
||||
|
||||
@@ -55,7 +55,7 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTreeEngine
|
||||
trace!(target: "blockchain_tree", hash=?block.hash, number=block.number, parent_hash=?block.parent_hash, "Inserting block");
|
||||
let mut tree = self.tree.write();
|
||||
let res = tree.insert_block(block);
|
||||
tree.update_tree_metrics();
|
||||
tree.update_metrics();
|
||||
res
|
||||
}
|
||||
|
||||
@@ -63,14 +63,14 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTreeEngine
|
||||
trace!(target: "blockchain_tree", ?finalized_block, "Finalizing block");
|
||||
let mut tree = self.tree.write();
|
||||
tree.finalize_block(finalized_block);
|
||||
tree.update_tree_metrics();
|
||||
tree.update_metrics();
|
||||
}
|
||||
|
||||
fn restore_canonical_hashes(&self, last_finalized_block: BlockNumber) -> Result<(), Error> {
|
||||
trace!(target: "blockchain_tree", ?last_finalized_block, "Restoring canonical hashes for last finalized block");
|
||||
let mut tree = self.tree.write();
|
||||
let res = tree.restore_canonical_hashes(last_finalized_block);
|
||||
tree.update_tree_metrics();
|
||||
tree.update_metrics();
|
||||
res
|
||||
}
|
||||
|
||||
@@ -78,7 +78,7 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTreeEngine
|
||||
trace!(target: "blockchain_tree", ?block_hash, "Making block canonical");
|
||||
let mut tree = self.tree.write();
|
||||
let res = tree.make_canonical(block_hash);
|
||||
tree.update_tree_metrics();
|
||||
tree.update_metrics();
|
||||
res
|
||||
}
|
||||
|
||||
@@ -86,7 +86,7 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTreeEngine
|
||||
trace!(target: "blockchain_tree", ?unwind_to, "Unwinding to block number");
|
||||
let mut tree = self.tree.write();
|
||||
let res = tree.unwind(unwind_to);
|
||||
tree.update_tree_metrics();
|
||||
tree.update_metrics();
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::metrics::{StageMetrics, SyncMetrics};
|
||||
use crate::metrics::SyncMetrics;
|
||||
use reth_primitives::{
|
||||
stage::{StageCheckpoint, StageId},
|
||||
BlockNumber,
|
||||
@@ -16,6 +16,11 @@ pub type MetricEventsSender = UnboundedSender<MetricEvent>;
|
||||
/// Collection of metric events.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub enum MetricEvent {
|
||||
/// Sync reached new height. All stage checkpoints are updated.
|
||||
SyncHeight {
|
||||
/// Maximum height measured in block number that sync reached.
|
||||
height: BlockNumber,
|
||||
},
|
||||
/// Stage reached new checkpoint.
|
||||
StageCheckpoint {
|
||||
/// Stage ID.
|
||||
@@ -44,10 +49,14 @@ impl MetricsListener {
|
||||
|
||||
fn handle_event(&mut self, event: MetricEvent) {
|
||||
match event {
|
||||
MetricEvent::SyncHeight { height } => {
|
||||
for stage_id in StageId::ALL {
|
||||
let stage_metrics = self.sync_metrics.get_stage_metrics(stage_id);
|
||||
stage_metrics.checkpoint.set(height as f64);
|
||||
}
|
||||
}
|
||||
MetricEvent::StageCheckpoint { stage_id, checkpoint, max_block_number } => {
|
||||
let stage_metrics = self.sync_metrics.stages.entry(stage_id).or_insert_with(|| {
|
||||
StageMetrics::new_with_labels(&[("stage", stage_id.to_string())])
|
||||
});
|
||||
let stage_metrics = self.sync_metrics.get_stage_metrics(stage_id);
|
||||
|
||||
stage_metrics.checkpoint.set(checkpoint.block_number as f64);
|
||||
|
||||
|
||||
@@ -10,6 +10,15 @@ pub(crate) struct SyncMetrics {
|
||||
pub(crate) stages: HashMap<StageId, StageMetrics>,
|
||||
}
|
||||
|
||||
impl SyncMetrics {
|
||||
/// Returns existing or initializes a new instance of [StageMetrics] for the provided [StageId].
|
||||
pub(crate) fn get_stage_metrics(&mut self, stage_id: StageId) -> &mut StageMetrics {
|
||||
self.stages
|
||||
.entry(stage_id)
|
||||
.or_insert_with(|| StageMetrics::new_with_labels(&[("stage", stage_id.to_string())]))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Metrics)]
|
||||
#[metrics(scope = "sync")]
|
||||
pub(crate) struct StageMetrics {
|
||||
|
||||
@@ -62,7 +62,7 @@ where
|
||||
}
|
||||
|
||||
/// Set the metric events sender.
|
||||
pub fn with_metric_events(mut self, metrics_tx: MetricEventsSender) -> Self {
|
||||
pub fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self {
|
||||
self.metrics_tx = Some(metrics_tx);
|
||||
self
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user