diff --git a/bin/reth-bench/src/bench/metrics_scraper.rs b/bin/reth-bench/src/bench/metrics_scraper.rs new file mode 100644 index 0000000000..3228df7a34 --- /dev/null +++ b/bin/reth-bench/src/bench/metrics_scraper.rs @@ -0,0 +1,156 @@ +//! Prometheus metrics scraper for reth-bench. +//! +//! Scrapes a node's Prometheus metrics endpoint after each block to record +//! execution and state root durations with block-level granularity. + +use csv::Writer; +use eyre::Context; +use reqwest::Client; +use serde::Serialize; +use std::{path::Path, time::Duration}; +use tracing::info; + +/// Suffix for the metrics CSV output file. +pub(crate) const METRICS_OUTPUT_SUFFIX: &str = "metrics.csv"; + +/// A single row of scraped prometheus metrics for one block. +#[derive(Debug, Clone, Serialize)] +pub(crate) struct MetricsRow { + /// The block number. + pub(crate) block_number: u64, + /// EVM execution duration in seconds (from `sync_execution_execution_duration` gauge). + pub(crate) execution_duration_secs: Option, + /// State root computation duration in seconds (from + /// `sync_block_validation_state_root_duration` gauge). + pub(crate) state_root_duration_secs: Option, +} + +/// Scrapes a Prometheus metrics endpoint after each block to collect +/// execution and state root durations. +pub(crate) struct MetricsScraper { + /// The full URL of the Prometheus metrics endpoint. + url: String, + /// Reusable HTTP client. + client: Client, + /// Collected metrics rows, one per block. + rows: Vec, +} + +impl MetricsScraper { + /// Creates a new scraper if a URL is provided. + pub(crate) fn maybe_new(url: Option) -> Option { + url.map(|url| { + info!(target: "reth-bench", %url, "Prometheus metrics scraping enabled"); + let client = Client::builder() + .timeout(Duration::from_secs(5)) + .build() + .expect("failed to build reqwest client"); + Self { url, client, rows: Vec::new() } + }) + } + + /// Scrapes the metrics endpoint and records values for the given block. + pub(crate) async fn scrape_after_block(&mut self, block_number: u64) -> eyre::Result<()> { + let text = self + .client + .get(&self.url) + .send() + .await + .wrap_err("failed to fetch metrics endpoint")? + .error_for_status() + .wrap_err("metrics endpoint returned error status")? + .text() + .await + .wrap_err("failed to read metrics response body")?; + + let execution = parse_gauge(&text, "sync_execution_execution_duration"); + let state_root = parse_gauge(&text, "sync_block_validation_state_root_duration"); + + self.rows.push(MetricsRow { + block_number, + execution_duration_secs: execution, + state_root_duration_secs: state_root, + }); + Ok(()) + } + + /// Writes collected metrics to a CSV file in the output directory. + pub(crate) fn write_csv(&self, output_dir: &Path) -> eyre::Result<()> { + let path = output_dir.join(METRICS_OUTPUT_SUFFIX); + info!(target: "reth-bench", "Writing scraped metrics to file: {:?}", path); + let mut writer = Writer::from_path(&path)?; + for row in &self.rows { + writer.serialize(row)?; + } + writer.flush()?; + Ok(()) + } +} + +/// Parses a Prometheus gauge value from exposition-format text. +/// +/// Searches for lines starting with `name` followed by either a space or `{` +/// (for labeled metrics), then parses the numeric value. Returns the last +/// matching sample to handle metrics emitted with multiple label sets. +fn parse_gauge(text: &str, name: &str) -> Option { + let mut result = None; + for line in text.lines() { + let line = line.trim(); + if line.is_empty() || line.starts_with('#') { + continue; + } + + if !line.starts_with(name) { + continue; + } + + // Ensure we match the full metric name, not a prefix of another metric. + let rest = &line[name.len()..]; + if !rest.starts_with(' ') && !rest.starts_with('{') { + continue; + } + + // Format: `metric_name{labels} value [timestamp]` or `metric_name value [timestamp]` + // Value is always the second whitespace-separated token. + let mut parts = line.split_whitespace(); + if let Some(value_str) = parts.nth(1) && + let Ok(v) = value_str.parse::() + { + result = Some(v); + } + } + result +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_gauge_simple() { + let text = r#"# HELP sync_execution_execution_duration Duration of execution +# TYPE sync_execution_execution_duration gauge +sync_execution_execution_duration 0.123456 +"#; + assert_eq!(parse_gauge(text, "sync_execution_execution_duration"), Some(0.123456)); + } + + #[test] + fn test_parse_gauge_missing() { + let text = "some_other_metric 1.0\n"; + assert_eq!(parse_gauge(text, "sync_execution_execution_duration"), None); + } + + #[test] + fn test_parse_gauge_with_labels() { + let text = "sync_block_validation_state_root_duration{instance=\"node1\"} 0.5\n"; + assert_eq!(parse_gauge(text, "sync_block_validation_state_root_duration"), Some(0.5)); + } + + #[test] + fn test_parse_gauge_prefix_no_false_match() { + let text = + "sync_execution_execution_duration_total 99.0\nsync_execution_execution_duration 0.5\n"; + assert_eq!(parse_gauge(text, "sync_execution_execution_duration"), Some(0.5)); + } +} diff --git a/bin/reth-bench/src/bench/mod.rs b/bin/reth-bench/src/bench/mod.rs index c18bcc9908..ee6826ffad 100644 --- a/bin/reth-bench/src/bench/mod.rs +++ b/bin/reth-bench/src/bench/mod.rs @@ -12,6 +12,7 @@ pub(crate) mod helpers; pub use generate_big_block::{ RawTransaction, RpcTransactionSource, TransactionCollector, TransactionSource, }; +pub(crate) mod metrics_scraper; mod new_payload_fcu; mod new_payload_only; mod output; diff --git a/bin/reth-bench/src/bench/new_payload_fcu.rs b/bin/reth-bench/src/bench/new_payload_fcu.rs index 13dc5bcb19..0c0bde76dd 100644 --- a/bin/reth-bench/src/bench/new_payload_fcu.rs +++ b/bin/reth-bench/src/bench/new_payload_fcu.rs @@ -13,6 +13,7 @@ use crate::{ bench::{ context::BenchContext, helpers::parse_duration, + metrics_scraper::MetricsScraper, output::{ write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow, }, @@ -30,7 +31,7 @@ use reth_cli_runner::CliContext; use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD; use reth_node_core::args::BenchmarkArgs; use std::time::{Duration, Instant}; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; /// `reth benchmark new-payload-fcu` command #[derive(Debug, Parser)] @@ -155,6 +156,8 @@ impl Command { let total_blocks = benchmark_mode.total_blocks(); + let mut metrics_scraper = MetricsScraper::maybe_new(self.benchmark.metrics_url.clone()); + if use_reth_namespace { info!("Using reth_newPayload endpoint"); } @@ -288,6 +291,12 @@ impl Command { }; info!(target: "reth-bench", progress, %combined_result); + if let Some(scraper) = metrics_scraper.as_mut() && + let Err(err) = scraper.scrape_after_block(block_number).await + { + warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics"); + } + if let Some(w) = &mut waiter { w.on_block(block_number).await?; } @@ -313,6 +322,10 @@ impl Command { write_benchmark_results(path, &gas_output_results, &combined_results)?; } + if let (Some(path), Some(scraper)) = (&self.benchmark.output, &metrics_scraper) { + scraper.write_csv(path)?; + } + let gas_output = TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?; diff --git a/bin/reth-bench/src/bench/new_payload_only.rs b/bin/reth-bench/src/bench/new_payload_only.rs index d9d3b9d9d2..1b875c729c 100644 --- a/bin/reth-bench/src/bench/new_payload_only.rs +++ b/bin/reth-bench/src/bench/new_payload_only.rs @@ -3,6 +3,7 @@ use crate::{ bench::{ context::BenchContext, + metrics_scraper::MetricsScraper, output::{ NewPayloadResult, TotalGasOutput, TotalGasRow, GAS_OUTPUT_SUFFIX, NEW_PAYLOAD_OUTPUT_SUFFIX, @@ -54,6 +55,8 @@ impl Command { let total_blocks = benchmark_mode.total_blocks(); + let mut metrics_scraper = MetricsScraper::maybe_new(self.benchmark.metrics_url.clone()); + if use_reth_namespace { info!("Using reth_newPayload endpoint"); } @@ -142,6 +145,12 @@ impl Command { let row = TotalGasRow { block_number, transaction_count, gas_used, time: current_duration }; results.push((row, new_payload_result)); + + if let Some(scraper) = metrics_scraper.as_mut() && + let Err(err) = scraper.scrape_after_block(block_number).await + { + tracing::warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics"); + } } // Check if the spawned task encountered an error @@ -172,6 +181,10 @@ impl Command { } writer.flush()?; + if let Some(scraper) = &metrics_scraper { + scraper.write_csv(&path)?; + } + info!(target: "reth-bench", "Finished writing benchmark output files to {:?}.", path); } diff --git a/bin/reth-bench/src/bench/replay_payloads.rs b/bin/reth-bench/src/bench/replay_payloads.rs index 25d0d307f4..0e88034a65 100644 --- a/bin/reth-bench/src/bench/replay_payloads.rs +++ b/bin/reth-bench/src/bench/replay_payloads.rs @@ -15,6 +15,7 @@ use crate::{ authenticated_transport::AuthenticatedTransportConnect, bench::{ helpers::parse_duration, + metrics_scraper::MetricsScraper, output::{ write_benchmark_results, CombinedResult, GasRampPayloadFile, NewPayloadResult, TotalGasOutput, TotalGasRow, @@ -135,6 +136,14 @@ pub struct Command { /// and returns server-side timing breakdowns (latency, persistence wait, cache wait). #[arg(long, default_value = "false", verbatim_doc_comment)] reth_new_payload: bool, + + /// Optional Prometheus metrics endpoint to scrape after each block. + /// + /// When provided, reth-bench will fetch metrics from this URL after each + /// payload, recording per-block execution and state root durations. + /// Results are written to `metrics.csv` in the output directory. + #[arg(long = "metrics-url", value_name = "URL", verbatim_doc_comment)] + metrics_url: Option, } /// A loaded payload ready for execution. @@ -204,6 +213,8 @@ impl Command { (None, false) => None, }; + let mut metrics_scraper = MetricsScraper::maybe_new(self.metrics_url.clone()); + // Set up authenticated engine provider let jwt = std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?; @@ -395,6 +406,12 @@ impl Command { let progress = format!("{}/{}", i + 1, payloads.len()); info!(target: "reth-bench", progress, %combined_result); + if let Some(scraper) = metrics_scraper.as_mut() && + let Err(err) = scraper.scrape_after_block(block_number).await + { + tracing::warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics"); + } + if let Some(w) = &mut waiter { w.on_block(block_number).await?; } @@ -418,6 +435,10 @@ impl Command { write_benchmark_results(path, &gas_output_results, &combined_results)?; } + if let (Some(path), Some(scraper)) = (&self.output, &metrics_scraper) { + scraper.write_csv(path)?; + } + let gas_output = TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?; info!( diff --git a/crates/node/core/src/args/benchmark_args.rs b/crates/node/core/src/args/benchmark_args.rs index 3a520a71a4..bf0c4b1cf4 100644 --- a/crates/node/core/src/args/benchmark_args.rs +++ b/crates/node/core/src/args/benchmark_args.rs @@ -60,6 +60,17 @@ pub struct BenchmarkArgs { #[arg(long, short, value_name = "BENCHMARK_OUTPUT", verbatim_doc_comment)] pub output: Option, + /// Optional Prometheus metrics endpoint to scrape after each block. + /// + /// When provided, reth-bench will fetch metrics from this URL after each + /// `newPayload` / `forkchoiceUpdated` call, recording per-block execution + /// and state root durations. Results are written to `metrics.csv` in the + /// output directory. + /// + /// Example: `http://127.0.0.1:9001/metrics` + #[arg(long = "metrics-url", value_name = "URL", verbatim_doc_comment)] + pub metrics_url: Option, + /// Use `reth_newPayload` endpoint instead of `engine_newPayload*`. /// /// The `reth_newPayload` endpoint is a reth-specific extension that takes `ExecutionData`