mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
feat(reth-bench): add prometheus metrics scraper (#22244)
Co-authored-by: Amp <amp@ampcode.com> Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
26470cadfc
commit
cd32e3cc05
156
bin/reth-bench/src/bench/metrics_scraper.rs
Normal file
156
bin/reth-bench/src/bench/metrics_scraper.rs
Normal file
@@ -0,0 +1,156 @@
|
||||
//! Prometheus metrics scraper for reth-bench.
|
||||
//!
|
||||
//! Scrapes a node's Prometheus metrics endpoint after each block to record
|
||||
//! execution and state root durations with block-level granularity.
|
||||
|
||||
use csv::Writer;
|
||||
use eyre::Context;
|
||||
use reqwest::Client;
|
||||
use serde::Serialize;
|
||||
use std::{path::Path, time::Duration};
|
||||
use tracing::info;
|
||||
|
||||
/// Suffix for the metrics CSV output file.
|
||||
pub(crate) const METRICS_OUTPUT_SUFFIX: &str = "metrics.csv";
|
||||
|
||||
/// A single row of scraped prometheus metrics for one block.
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub(crate) struct MetricsRow {
|
||||
/// The block number.
|
||||
pub(crate) block_number: u64,
|
||||
/// EVM execution duration in seconds (from `sync_execution_execution_duration` gauge).
|
||||
pub(crate) execution_duration_secs: Option<f64>,
|
||||
/// State root computation duration in seconds (from
|
||||
/// `sync_block_validation_state_root_duration` gauge).
|
||||
pub(crate) state_root_duration_secs: Option<f64>,
|
||||
}
|
||||
|
||||
/// Scrapes a Prometheus metrics endpoint after each block to collect
|
||||
/// execution and state root durations.
|
||||
pub(crate) struct MetricsScraper {
|
||||
/// The full URL of the Prometheus metrics endpoint.
|
||||
url: String,
|
||||
/// Reusable HTTP client.
|
||||
client: Client,
|
||||
/// Collected metrics rows, one per block.
|
||||
rows: Vec<MetricsRow>,
|
||||
}
|
||||
|
||||
impl MetricsScraper {
|
||||
/// Creates a new scraper if a URL is provided.
|
||||
pub(crate) fn maybe_new(url: Option<String>) -> Option<Self> {
|
||||
url.map(|url| {
|
||||
info!(target: "reth-bench", %url, "Prometheus metrics scraping enabled");
|
||||
let client = Client::builder()
|
||||
.timeout(Duration::from_secs(5))
|
||||
.build()
|
||||
.expect("failed to build reqwest client");
|
||||
Self { url, client, rows: Vec::new() }
|
||||
})
|
||||
}
|
||||
|
||||
/// Scrapes the metrics endpoint and records values for the given block.
|
||||
pub(crate) async fn scrape_after_block(&mut self, block_number: u64) -> eyre::Result<()> {
|
||||
let text = self
|
||||
.client
|
||||
.get(&self.url)
|
||||
.send()
|
||||
.await
|
||||
.wrap_err("failed to fetch metrics endpoint")?
|
||||
.error_for_status()
|
||||
.wrap_err("metrics endpoint returned error status")?
|
||||
.text()
|
||||
.await
|
||||
.wrap_err("failed to read metrics response body")?;
|
||||
|
||||
let execution = parse_gauge(&text, "sync_execution_execution_duration");
|
||||
let state_root = parse_gauge(&text, "sync_block_validation_state_root_duration");
|
||||
|
||||
self.rows.push(MetricsRow {
|
||||
block_number,
|
||||
execution_duration_secs: execution,
|
||||
state_root_duration_secs: state_root,
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Writes collected metrics to a CSV file in the output directory.
|
||||
pub(crate) fn write_csv(&self, output_dir: &Path) -> eyre::Result<()> {
|
||||
let path = output_dir.join(METRICS_OUTPUT_SUFFIX);
|
||||
info!(target: "reth-bench", "Writing scraped metrics to file: {:?}", path);
|
||||
let mut writer = Writer::from_path(&path)?;
|
||||
for row in &self.rows {
|
||||
writer.serialize(row)?;
|
||||
}
|
||||
writer.flush()?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Parses a Prometheus gauge value from exposition-format text.
|
||||
///
|
||||
/// Searches for lines starting with `name` followed by either a space or `{`
|
||||
/// (for labeled metrics), then parses the numeric value. Returns the last
|
||||
/// matching sample to handle metrics emitted with multiple label sets.
|
||||
fn parse_gauge(text: &str, name: &str) -> Option<f64> {
|
||||
let mut result = None;
|
||||
for line in text.lines() {
|
||||
let line = line.trim();
|
||||
if line.is_empty() || line.starts_with('#') {
|
||||
continue;
|
||||
}
|
||||
|
||||
if !line.starts_with(name) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Ensure we match the full metric name, not a prefix of another metric.
|
||||
let rest = &line[name.len()..];
|
||||
if !rest.starts_with(' ') && !rest.starts_with('{') {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Format: `metric_name{labels} value [timestamp]` or `metric_name value [timestamp]`
|
||||
// Value is always the second whitespace-separated token.
|
||||
let mut parts = line.split_whitespace();
|
||||
if let Some(value_str) = parts.nth(1) &&
|
||||
let Ok(v) = value_str.parse::<f64>()
|
||||
{
|
||||
result = Some(v);
|
||||
}
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_parse_gauge_simple() {
|
||||
let text = r#"# HELP sync_execution_execution_duration Duration of execution
|
||||
# TYPE sync_execution_execution_duration gauge
|
||||
sync_execution_execution_duration 0.123456
|
||||
"#;
|
||||
assert_eq!(parse_gauge(text, "sync_execution_execution_duration"), Some(0.123456));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_gauge_missing() {
|
||||
let text = "some_other_metric 1.0\n";
|
||||
assert_eq!(parse_gauge(text, "sync_execution_execution_duration"), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_gauge_with_labels() {
|
||||
let text = "sync_block_validation_state_root_duration{instance=\"node1\"} 0.5\n";
|
||||
assert_eq!(parse_gauge(text, "sync_block_validation_state_root_duration"), Some(0.5));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_gauge_prefix_no_false_match() {
|
||||
let text =
|
||||
"sync_execution_execution_duration_total 99.0\nsync_execution_execution_duration 0.5\n";
|
||||
assert_eq!(parse_gauge(text, "sync_execution_execution_duration"), Some(0.5));
|
||||
}
|
||||
}
|
||||
@@ -12,6 +12,7 @@ pub(crate) mod helpers;
|
||||
pub use generate_big_block::{
|
||||
RawTransaction, RpcTransactionSource, TransactionCollector, TransactionSource,
|
||||
};
|
||||
pub(crate) mod metrics_scraper;
|
||||
mod new_payload_fcu;
|
||||
mod new_payload_only;
|
||||
mod output;
|
||||
|
||||
@@ -13,6 +13,7 @@ use crate::{
|
||||
bench::{
|
||||
context::BenchContext,
|
||||
helpers::parse_duration,
|
||||
metrics_scraper::MetricsScraper,
|
||||
output::{
|
||||
write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
|
||||
},
|
||||
@@ -30,7 +31,7 @@ use reth_cli_runner::CliContext;
|
||||
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
|
||||
use reth_node_core::args::BenchmarkArgs;
|
||||
use std::time::{Duration, Instant};
|
||||
use tracing::{debug, info};
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
/// `reth benchmark new-payload-fcu` command
|
||||
#[derive(Debug, Parser)]
|
||||
@@ -155,6 +156,8 @@ impl Command {
|
||||
|
||||
let total_blocks = benchmark_mode.total_blocks();
|
||||
|
||||
let mut metrics_scraper = MetricsScraper::maybe_new(self.benchmark.metrics_url.clone());
|
||||
|
||||
if use_reth_namespace {
|
||||
info!("Using reth_newPayload endpoint");
|
||||
}
|
||||
@@ -288,6 +291,12 @@ impl Command {
|
||||
};
|
||||
info!(target: "reth-bench", progress, %combined_result);
|
||||
|
||||
if let Some(scraper) = metrics_scraper.as_mut() &&
|
||||
let Err(err) = scraper.scrape_after_block(block_number).await
|
||||
{
|
||||
warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics");
|
||||
}
|
||||
|
||||
if let Some(w) = &mut waiter {
|
||||
w.on_block(block_number).await?;
|
||||
}
|
||||
@@ -313,6 +322,10 @@ impl Command {
|
||||
write_benchmark_results(path, &gas_output_results, &combined_results)?;
|
||||
}
|
||||
|
||||
if let (Some(path), Some(scraper)) = (&self.benchmark.output, &metrics_scraper) {
|
||||
scraper.write_csv(path)?;
|
||||
}
|
||||
|
||||
let gas_output =
|
||||
TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
use crate::{
|
||||
bench::{
|
||||
context::BenchContext,
|
||||
metrics_scraper::MetricsScraper,
|
||||
output::{
|
||||
NewPayloadResult, TotalGasOutput, TotalGasRow, GAS_OUTPUT_SUFFIX,
|
||||
NEW_PAYLOAD_OUTPUT_SUFFIX,
|
||||
@@ -54,6 +55,8 @@ impl Command {
|
||||
|
||||
let total_blocks = benchmark_mode.total_blocks();
|
||||
|
||||
let mut metrics_scraper = MetricsScraper::maybe_new(self.benchmark.metrics_url.clone());
|
||||
|
||||
if use_reth_namespace {
|
||||
info!("Using reth_newPayload endpoint");
|
||||
}
|
||||
@@ -142,6 +145,12 @@ impl Command {
|
||||
let row =
|
||||
TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
|
||||
results.push((row, new_payload_result));
|
||||
|
||||
if let Some(scraper) = metrics_scraper.as_mut() &&
|
||||
let Err(err) = scraper.scrape_after_block(block_number).await
|
||||
{
|
||||
tracing::warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics");
|
||||
}
|
||||
}
|
||||
|
||||
// Check if the spawned task encountered an error
|
||||
@@ -172,6 +181,10 @@ impl Command {
|
||||
}
|
||||
writer.flush()?;
|
||||
|
||||
if let Some(scraper) = &metrics_scraper {
|
||||
scraper.write_csv(&path)?;
|
||||
}
|
||||
|
||||
info!(target: "reth-bench", "Finished writing benchmark output files to {:?}.", path);
|
||||
}
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ use crate::{
|
||||
authenticated_transport::AuthenticatedTransportConnect,
|
||||
bench::{
|
||||
helpers::parse_duration,
|
||||
metrics_scraper::MetricsScraper,
|
||||
output::{
|
||||
write_benchmark_results, CombinedResult, GasRampPayloadFile, NewPayloadResult,
|
||||
TotalGasOutput, TotalGasRow,
|
||||
@@ -135,6 +136,14 @@ pub struct Command {
|
||||
/// and returns server-side timing breakdowns (latency, persistence wait, cache wait).
|
||||
#[arg(long, default_value = "false", verbatim_doc_comment)]
|
||||
reth_new_payload: bool,
|
||||
|
||||
/// Optional Prometheus metrics endpoint to scrape after each block.
|
||||
///
|
||||
/// When provided, reth-bench will fetch metrics from this URL after each
|
||||
/// payload, recording per-block execution and state root durations.
|
||||
/// Results are written to `metrics.csv` in the output directory.
|
||||
#[arg(long = "metrics-url", value_name = "URL", verbatim_doc_comment)]
|
||||
metrics_url: Option<String>,
|
||||
}
|
||||
|
||||
/// A loaded payload ready for execution.
|
||||
@@ -204,6 +213,8 @@ impl Command {
|
||||
(None, false) => None,
|
||||
};
|
||||
|
||||
let mut metrics_scraper = MetricsScraper::maybe_new(self.metrics_url.clone());
|
||||
|
||||
// Set up authenticated engine provider
|
||||
let jwt =
|
||||
std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?;
|
||||
@@ -395,6 +406,12 @@ impl Command {
|
||||
let progress = format!("{}/{}", i + 1, payloads.len());
|
||||
info!(target: "reth-bench", progress, %combined_result);
|
||||
|
||||
if let Some(scraper) = metrics_scraper.as_mut() &&
|
||||
let Err(err) = scraper.scrape_after_block(block_number).await
|
||||
{
|
||||
tracing::warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics");
|
||||
}
|
||||
|
||||
if let Some(w) = &mut waiter {
|
||||
w.on_block(block_number).await?;
|
||||
}
|
||||
@@ -418,6 +435,10 @@ impl Command {
|
||||
write_benchmark_results(path, &gas_output_results, &combined_results)?;
|
||||
}
|
||||
|
||||
if let (Some(path), Some(scraper)) = (&self.output, &metrics_scraper) {
|
||||
scraper.write_csv(path)?;
|
||||
}
|
||||
|
||||
let gas_output =
|
||||
TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;
|
||||
info!(
|
||||
|
||||
@@ -60,6 +60,17 @@ pub struct BenchmarkArgs {
|
||||
#[arg(long, short, value_name = "BENCHMARK_OUTPUT", verbatim_doc_comment)]
|
||||
pub output: Option<PathBuf>,
|
||||
|
||||
/// Optional Prometheus metrics endpoint to scrape after each block.
|
||||
///
|
||||
/// When provided, reth-bench will fetch metrics from this URL after each
|
||||
/// `newPayload` / `forkchoiceUpdated` call, recording per-block execution
|
||||
/// and state root durations. Results are written to `metrics.csv` in the
|
||||
/// output directory.
|
||||
///
|
||||
/// Example: `http://127.0.0.1:9001/metrics`
|
||||
#[arg(long = "metrics-url", value_name = "URL", verbatim_doc_comment)]
|
||||
pub metrics_url: Option<String>,
|
||||
|
||||
/// Use `reth_newPayload` endpoint instead of `engine_newPayload*`.
|
||||
///
|
||||
/// The `reth_newPayload` endpoint is a reth-specific extension that takes `ExecutionData`
|
||||
|
||||
Reference in New Issue
Block a user