diff --git a/crates/consensus/beacon/src/engine/metrics.rs b/crates/consensus/beacon/src/engine/metrics.rs index c9a62e5ba4..14d68f4dd7 100644 --- a/crates/consensus/beacon/src/engine/metrics.rs +++ b/crates/consensus/beacon/src/engine/metrics.rs @@ -1,12 +1,12 @@ use reth_metrics::{ - metrics::{self, Counter}, + metrics::{self, Counter, Gauge}, Metrics, }; /// Beacon consensus engine metrics. #[derive(Metrics)] #[metrics(scope = "consensus.engine.beacon")] -pub(crate) struct Metrics { +pub(crate) struct EngineMetrics { /// The number of times the pipeline was run. pub(crate) pipeline_runs: Counter, /// The total count of forkchoice updated messages received. @@ -14,3 +14,11 @@ pub(crate) struct Metrics { /// The total count of new payload messages received. pub(crate) new_payload_messages: Counter, } + +/// Metrics for the `EngineSyncController`. +#[derive(Metrics)] +#[metrics(scope = "consensus.engine.beacon")] +pub(crate) struct EngineSyncMetrics { + /// How many blocks are currently being downloaded. + pub(crate) active_block_downloads: Gauge, +} diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 7d6333e216..e0166d91d9 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -1,5 +1,5 @@ use crate::{ - engine::{message::OnForkChoiceUpdated, metrics::Metrics}, + engine::{message::OnForkChoiceUpdated, metrics::EngineMetrics}, sync::{EngineSyncController, EngineSyncEvent}, }; use futures::{Future, StreamExt, TryFutureExt}; @@ -232,7 +232,7 @@ where /// invalid. invalid_headers: InvalidHeaderCache, /// Consensus engine metrics. - metrics: Metrics, + metrics: EngineMetrics, } impl BeaconConsensusEngine @@ -318,7 +318,7 @@ where payload_builder, listeners: EventListeners::default(), invalid_headers: InvalidHeaderCache::new(MAX_INVALID_HEADERS), - metrics: Metrics::default(), + metrics: EngineMetrics::default(), }; let maybe_pipeline_target = match target { diff --git a/crates/consensus/beacon/src/engine/sync.rs b/crates/consensus/beacon/src/engine/sync.rs index cc678eca22..a093b57bae 100644 --- a/crates/consensus/beacon/src/engine/sync.rs +++ b/crates/consensus/beacon/src/engine/sync.rs @@ -1,5 +1,6 @@ //! Sync management for the engine implementation. +use crate::engine::metrics::EngineSyncMetrics; use futures::FutureExt; use reth_db::database::Database; use reth_interfaces::p2p::{ @@ -47,6 +48,8 @@ where /// Max block after which the consensus engine would terminate the sync. Used for debugging /// purposes. max_block: Option, + /// Engine sync metrics. + metrics: EngineSyncMetrics, } impl EngineSyncController @@ -71,9 +74,15 @@ where queued_events: VecDeque::new(), run_pipeline_continuously, max_block, + metrics: EngineSyncMetrics::default(), } } + /// Sets the metrics for the active downloads + fn update_block_download_metrics(&self) { + self.metrics.active_block_downloads.set(self.inflight_full_block_requests.len() as f64); + } + /// Sets the max block value for testing #[cfg(test)] pub(crate) fn set_max_block(&mut self, block: BlockNumber) { @@ -83,11 +92,13 @@ where /// Cancels all full block requests that are in progress. pub(crate) fn clear_full_block_requests(&mut self) { self.inflight_full_block_requests.clear(); + self.update_block_download_metrics(); } /// Cancels the full block request with the given hash. pub(crate) fn cancel_full_block_request(&mut self, hash: H256) { self.inflight_full_block_requests.retain(|req| *req.hash() != hash); + self.update_block_download_metrics(); } /// Returns whether or not the sync controller is set to run the pipeline continuously. @@ -125,6 +136,9 @@ where ); let request = self.full_block_client.get_full_block(hash); self.inflight_full_block_requests.push(request); + + self.update_block_download_metrics(); + true } @@ -241,6 +255,8 @@ where } } + self.update_block_download_metrics(); + if !self.pipeline_state.is_idle() || self.queued_events.is_empty() { // can not make any progress return Poll::Pending