perf(db): throttle metrics reporting (#20974)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
DaniPopes
2026-01-12 22:44:24 +00:00
committed by GitHub
parent 98a35cc870
commit c5e00e4aeb
4 changed files with 117 additions and 34 deletions

View File

@@ -11,7 +11,6 @@ use reth_cli::chainspec::ChainSpecParser;
use reth_cli_runner::CliContext;
use reth_cli_util::get_secret_key;
use reth_config::config::{HashingConfig, SenderRecoveryConfig, TransactionLookupConfig};
use reth_db_api::database_metrics::DatabaseMetrics;
use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder,
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
@@ -19,19 +18,19 @@ use reth_downloaders::{
use reth_exex::ExExManagerHandle;
use reth_network::BlockDownloaderProvider;
use reth_network_p2p::HeadersClient;
use reth_node_builder::common::metrics_hooks;
use reth_node_core::{
args::{NetworkArgs, StageEnum},
version::version_metadata,
};
use reth_node_metrics::{
chain::ChainSpecInfo,
hooks::Hooks,
server::{MetricServer, MetricServerConfig},
version::VersionInfo,
};
use reth_provider::{
ChainSpecProvider, DBProvider, DatabaseProviderFactory, StageCheckpointReader,
StageCheckpointWriter, StaticFileProviderFactory,
StageCheckpointWriter,
};
use reth_stages::{
stages::{
@@ -139,20 +138,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
},
ChainSpecInfo { name: provider_factory.chain_spec().chain().to_string() },
ctx.task_executor,
Hooks::builder()
.with_hook({
let db = provider_factory.db_ref().clone();
move || db.report_metrics()
})
.with_hook({
let sfp = provider_factory.static_file_provider();
move || {
if let Err(error) = sfp.report_metrics() {
error!(%error, "Failed to report metrics from static file provider");
}
}
})
.build(),
metrics_hooks(&provider_factory),
data_dir.pprof_dumps(),
);

View File

@@ -79,9 +79,12 @@ use reth_stages::{
};
use reth_static_file::StaticFileProducer;
use reth_tasks::TaskExecutor;
use reth_tracing::tracing::{debug, error, info, warn};
use reth_tracing::{
throttle,
tracing::{debug, error, info, warn},
};
use reth_transaction_pool::TransactionPool;
use std::{sync::Arc, thread::available_parallelism};
use std::{sync::Arc, thread::available_parallelism, time::Duration};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedSender},
oneshot, watch,
@@ -650,23 +653,13 @@ where
},
ChainSpecInfo { name: self.chain_id().to_string() },
self.task_executor().clone(),
Hooks::builder()
.with_hook({
let db = self.database().clone();
move || db.report_metrics()
})
.with_hook({
let sfp = self.static_file_provider();
move || {
if let Err(error) = sfp.report_metrics() {
error!(%error, "Failed to report metrics for the static file provider");
}
}
})
.build(),
metrics_hooks(self.provider_factory()),
self.data_dir().pprof_dumps(),
)
.with_push_gateway(self.node_config().metrics.push_gateway_url.clone(), self.node_config().metrics.push_gateway_interval);
.with_push_gateway(
self.node_config().metrics.push_gateway_url.clone(),
self.node_config().metrics.push_gateway_interval,
);
MetricServer::new(config).serve().await?;
}
@@ -1266,6 +1259,26 @@ where
head: Head,
}
/// Returns the metrics hooks for the node.
pub fn metrics_hooks<N: NodeTypesWithDB>(provider_factory: &ProviderFactory<N>) -> Hooks {
Hooks::builder()
.with_hook({
let db = provider_factory.db_ref().clone();
move || throttle!(Duration::from_secs(5 * 60), || db.report_metrics())
})
.with_hook({
let sfp = provider_factory.static_file_provider();
move || {
throttle!(Duration::from_secs(5 * 60), || {
if let Err(error) = sfp.report_metrics() {
error!(%error, "Failed to report metrics from static file provider");
}
})
}
})
.build()
}
#[cfg(test)]
mod tests {
use super::{LaunchContext, NodeConfig};

View File

@@ -56,9 +56,15 @@ pub use formatter::LogFormat;
pub use layers::{FileInfo, FileWorkerGuard, Layers};
pub use test_tracer::TestTracer;
#[doc(hidden)]
pub mod __private {
pub use super::throttle::*;
}
mod formatter;
mod layers;
mod test_tracer;
mod throttle;
use tracing::level_filters::LevelFilter;
use tracing_appender::non_blocking::WorkerGuard;

View File

@@ -0,0 +1,78 @@
//! Throttling utilities for rate-limiting expression execution.
use std::{
sync::{
atomic::{AtomicU64, Ordering},
LazyLock,
},
time::Instant,
};
/// Sentinel value indicating the throttle has never run.
#[doc(hidden)]
pub const NOT_YET_RUN: u64 = u64::MAX;
/// Checks if enough time has passed since the last run. Implementation detail for [`throttle!`].
#[doc(hidden)]
pub fn should_run(start: &LazyLock<Instant>, last: &AtomicU64, duration_millis: u64) -> bool {
let now = start.elapsed().as_millis() as u64;
let last_val = last.load(Ordering::Relaxed);
if last_val == NOT_YET_RUN {
return last
.compare_exchange(NOT_YET_RUN, now, Ordering::Relaxed, Ordering::Relaxed)
.is_ok();
}
if now.saturating_sub(last_val) >= duration_millis {
last.compare_exchange(last_val, now, Ordering::Relaxed, Ordering::Relaxed).is_ok()
} else {
false
}
}
/// Throttles the execution of an expression to run at most once per specified duration.
///
/// Uses static variables with lazy initialization to track the last execution time.
/// Thread-safe via atomic operations.
///
/// # Examples
///
/// ```ignore
/// use std::time::Duration;
/// use reth_tracing::throttle;
///
/// // Log at most once per second.
/// throttle!(Duration::from_secs(1), || {
/// tracing::info!("This message is throttled");
/// });
/// ```
#[macro_export]
macro_rules! throttle {
($duration:expr, || $expr:expr) => {{
static START: ::std::sync::LazyLock<::std::time::Instant> =
::std::sync::LazyLock::new(::std::time::Instant::now);
static LAST: ::core::sync::atomic::AtomicU64 =
::core::sync::atomic::AtomicU64::new($crate::__private::NOT_YET_RUN);
if $crate::__private::should_run(&START, &LAST, $duration.as_millis() as u64) {
$expr
}
}};
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn test_throttle_runs_once_initially() {
static COUNTER: AtomicUsize = AtomicUsize::new(0);
throttle!(std::time::Duration::from_secs(10), || {
COUNTER.fetch_add(1, Ordering::SeqCst);
});
assert_eq!(COUNTER.load(Ordering::SeqCst), 1);
}
}