diff --git a/bin/reth-bench/src/bench/mod.rs b/bin/reth-bench/src/bench/mod.rs index 5ccbc77546..3bb84915c9 100644 --- a/bin/reth-bench/src/bench/mod.rs +++ b/bin/reth-bench/src/bench/mod.rs @@ -15,6 +15,7 @@ pub use generate_big_block::{ mod new_payload_fcu; mod new_payload_only; mod output; +mod persistence_waiter; mod replay_payloads; mod send_invalid_payload; mod send_payload; diff --git a/bin/reth-bench/src/bench/new_payload_fcu.rs b/bin/reth-bench/src/bench/new_payload_fcu.rs index 62e0aef259..e92556765b 100644 --- a/bin/reth-bench/src/bench/new_payload_fcu.rs +++ b/bin/reth-bench/src/bench/new_payload_fcu.rs @@ -15,19 +15,17 @@ use crate::{ output::{ write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow, }, + persistence_waiter::{ + engine_url_to_ws_url, setup_persistence_subscription, PersistenceWaiter, + PERSISTENCE_CHECKPOINT_TIMEOUT, + }, }, valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload}, }; -use alloy_eips::BlockNumHash; -use alloy_network::Ethereum; -use alloy_provider::{Provider, RootProvider}; -use alloy_pubsub::SubscriptionStream; -use alloy_rpc_client::RpcClient; +use alloy_provider::Provider; use alloy_rpc_types_engine::ForkchoiceState; -use alloy_transport_ws::WsConnect; use clap::Parser; use eyre::{Context, OptionExt}; -use futures::StreamExt; use humantime::parse_duration; use reth_cli_runner::CliContext; use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD; @@ -36,8 +34,6 @@ use std::time::{Duration, Instant}; use tracing::{debug, info}; use url::Url; -const PERSISTENCE_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(60); - /// `reth benchmark new-payload-fcu` command #[derive(Debug, Parser)] pub struct Command { @@ -105,7 +101,8 @@ impl Command { let mut waiter = match (self.wait_time, self.wait_for_persistence) { (Some(duration), _) => Some(PersistenceWaiter::with_duration(duration)), (None, true) => { - let sub = self.setup_persistence_subscription().await?; + let ws_url = self.derive_ws_rpc_url()?; + let sub = setup_persistence_subscription(ws_url).await?; Some(PersistenceWaiter::with_subscription( sub, self.persistence_threshold, @@ -245,17 +242,20 @@ impl Command { results.into_iter().unzip(); if let Some(ref path) = self.benchmark.output { - write_benchmark_results(path, &gas_output_results, combined_results)?; + write_benchmark_results(path, &gas_output_results, &combined_results)?; } - let gas_output = TotalGasOutput::new(gas_output_results)?; + let gas_output = + TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?; info!( - total_duration=?gas_output.total_duration, - total_gas_used=?gas_output.total_gas_used, - blocks_processed=?gas_output.blocks_processed, - "Total Ggas/s: {:.4}", - gas_output.total_gigagas_per_second() + total_gas_used = gas_output.total_gas_used, + total_duration = ?gas_output.total_duration, + execution_duration = ?gas_output.execution_duration, + blocks_processed = gas_output.blocks_processed, + wall_clock_ggas_per_second = format_args!("{:.4}", gas_output.total_gigagas_per_second()), + execution_ggas_per_second = format_args!("{:.4}", gas_output.execution_gigagas_per_second()), + "Benchmark complete" ); Ok(()) @@ -289,249 +289,4 @@ impl Command { Ok(derived) } } - - /// Establishes a websocket connection and subscribes to `reth_subscribePersistedBlock`. - async fn setup_persistence_subscription(&self) -> eyre::Result { - let ws_url = self.derive_ws_rpc_url()?; - - info!("Connecting to WebSocket at {} for persistence subscription", ws_url); - - let ws_connect = WsConnect::new(ws_url.to_string()); - let client = RpcClient::connect_pubsub(ws_connect) - .await - .wrap_err("Failed to connect to WebSocket RPC endpoint")?; - let provider: RootProvider = RootProvider::new(client); - - let subscription = provider - .subscribe_to::("reth_subscribePersistedBlock") - .await - .wrap_err("Failed to subscribe to persistence notifications")?; - - info!("Subscribed to persistence notifications"); - - Ok(PersistenceSubscription::new(provider, subscription.into_stream())) - } -} - -/// Converts an engine API URL to the default RPC websocket URL. -/// -/// Transformations: -/// - `http` → `ws` -/// - `https` → `wss` -/// - `ws` / `wss` keep their scheme -/// - Port is always set to `8546`, reth's default RPC websocket port. -/// -/// This is used when we only know the engine API URL (typically `:8551`) but -/// need to connect to the node's WS RPC endpoint for persistence events. -fn engine_url_to_ws_url(engine_url: &str) -> eyre::Result { - let url: Url = engine_url - .parse() - .wrap_err_with(|| format!("Failed to parse engine RPC URL: {engine_url}"))?; - - let mut ws_url = url.clone(); - - match ws_url.scheme() { - "http" => ws_url - .set_scheme("ws") - .map_err(|_| eyre::eyre!("Failed to set WS scheme for URL: {url}"))?, - "https" => ws_url - .set_scheme("wss") - .map_err(|_| eyre::eyre!("Failed to set WSS scheme for URL: {url}"))?, - "ws" | "wss" => {} - scheme => { - return Err(eyre::eyre!( - "Unsupported URL scheme '{scheme}' for URL: {url}. Expected http, https, ws, or wss." - )) - } - } - - ws_url.set_port(Some(8546)).map_err(|_| eyre::eyre!("Failed to set port for URL: {url}"))?; - - Ok(ws_url) -} - -/// Waits until the persistence subscription reports that `target` has been persisted. -/// -/// Consumes subscription events until `last_persisted >= target`, or returns an error if: -/// - the subscription stream ends unexpectedly, or -/// - `timeout` elapses before `target` is observed. -async fn wait_for_persistence( - stream: &mut SubscriptionStream, - target: u64, - last_persisted: &mut u64, - timeout: Duration, -) -> eyre::Result<()> { - tokio::time::timeout(timeout, async { - while *last_persisted < target { - match stream.next().await { - Some(persisted) => { - *last_persisted = persisted.number; - debug!( - target: "reth-bench", - persisted_block = ?last_persisted, - "Received persistence notification" - ); - } - None => { - return Err(eyre::eyre!("Persistence subscription closed unexpectedly")); - } - } - } - Ok(()) - }) - .await - .map_err(|_| { - eyre::eyre!( - "Persistence timeout: target block {} not persisted within {:?}. Last persisted: {}", - target, - timeout, - last_persisted - ) - })? -} - -/// Wrapper that keeps both the subscription stream and the underlying provider alive. -/// The provider must be kept alive for the subscription to continue receiving events. -struct PersistenceSubscription { - _provider: RootProvider, - stream: SubscriptionStream, -} - -impl PersistenceSubscription { - const fn new( - provider: RootProvider, - stream: SubscriptionStream, - ) -> Self { - Self { _provider: provider, stream } - } - - const fn stream_mut(&mut self) -> &mut SubscriptionStream { - &mut self.stream - } -} - -/// Encapsulates the block waiting logic. -/// -/// Provides a simple `on_block()` interface that handles both: -/// - Fixed duration waits (when `wait_time` is set) -/// - Persistence-based waits (when `subscription` is set) -/// -/// For persistence mode, waits after every `(threshold + 1)` blocks. -struct PersistenceWaiter { - wait_time: Option, - subscription: Option, - blocks_sent: u64, - last_persisted: u64, - threshold: u64, - timeout: Duration, -} - -impl PersistenceWaiter { - const fn with_duration(wait_time: Duration) -> Self { - Self { - wait_time: Some(wait_time), - subscription: None, - blocks_sent: 0, - last_persisted: 0, - threshold: 0, - timeout: Duration::ZERO, - } - } - - const fn with_subscription( - subscription: PersistenceSubscription, - threshold: u64, - timeout: Duration, - ) -> Self { - Self { - wait_time: None, - subscription: Some(subscription), - blocks_sent: 0, - last_persisted: 0, - threshold, - timeout, - } - } - - /// Called once per block. Waits based on the configured mode. - #[allow(clippy::manual_is_multiple_of)] - async fn on_block(&mut self, block_number: u64) -> eyre::Result<()> { - if let Some(wait_time) = self.wait_time { - tokio::time::sleep(wait_time).await; - return Ok(()); - } - - let Some(ref mut subscription) = self.subscription else { - return Ok(()); - }; - - self.blocks_sent += 1; - - if self.blocks_sent % (self.threshold + 1) == 0 { - debug!( - target: "reth-bench", - target_block = ?block_number, - last_persisted = self.last_persisted, - blocks_sent = self.blocks_sent, - "Waiting for persistence" - ); - - wait_for_persistence( - subscription.stream_mut(), - block_number, - &mut self.last_persisted, - self.timeout, - ) - .await?; - - debug!( - target: "reth-bench", - persisted = self.last_persisted, - "Persistence caught up" - ); - } - - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_engine_url_to_ws_url() { - // http -> ws, always uses port 8546 - let result = engine_url_to_ws_url("http://localhost:8551").unwrap(); - assert_eq!(result.as_str(), "ws://localhost:8546/"); - - // https -> wss - let result = engine_url_to_ws_url("https://localhost:8551").unwrap(); - assert_eq!(result.as_str(), "wss://localhost:8546/"); - - // Custom engine port still maps to 8546 - let result = engine_url_to_ws_url("http://localhost:9551").unwrap(); - assert_eq!(result.port(), Some(8546)); - - // Already ws passthrough - let result = engine_url_to_ws_url("ws://localhost:8546").unwrap(); - assert_eq!(result.scheme(), "ws"); - - // Invalid inputs - assert!(engine_url_to_ws_url("ftp://localhost:8551").is_err()); - assert!(engine_url_to_ws_url("not a valid url").is_err()); - } - - #[tokio::test] - async fn test_waiter_with_duration() { - let mut waiter = PersistenceWaiter::with_duration(Duration::from_millis(1)); - - let start = Instant::now(); - waiter.on_block(1).await.unwrap(); - waiter.on_block(2).await.unwrap(); - waiter.on_block(3).await.unwrap(); - - // Should have waited ~3ms total - assert!(start.elapsed() >= Duration::from_millis(3)); - } } diff --git a/bin/reth-bench/src/bench/output.rs b/bin/reth-bench/src/bench/output.rs index 25a1deaf22..d367f63f3d 100644 --- a/bin/reth-bench/src/bench/output.rs +++ b/bin/reth-bench/src/bench/output.rs @@ -6,7 +6,7 @@ use csv::Writer; use eyre::OptionExt; use reth_primitives_traits::constants::GIGAGAS; use serde::{ser::SerializeStruct, Deserialize, Serialize}; -use std::{path::Path, time::Duration}; +use std::{fs, path::Path, time::Duration}; use tracing::info; /// This is the suffix for gas output csv files. @@ -158,29 +158,58 @@ pub(crate) struct TotalGasRow { pub(crate) struct TotalGasOutput { /// The total gas used in the benchmark. pub(crate) total_gas_used: u64, - /// The total duration of the benchmark. + /// The total wall-clock duration of the benchmark (includes wait times). pub(crate) total_duration: Duration, - /// The total gas used per second. - pub(crate) total_gas_per_second: f64, + /// The total execution-only duration (excludes wait times). + pub(crate) execution_duration: Duration, /// The number of blocks processed. pub(crate) blocks_processed: u64, } impl TotalGasOutput { - /// Create a new [`TotalGasOutput`] from a list of [`TotalGasRow`]. + /// Create a new [`TotalGasOutput`] from gas rows only. + /// + /// Use this when execution-only timing is not available (e.g., `new_payload_only`). + /// `execution_duration` will equal `total_duration`. pub(crate) fn new(rows: Vec) -> eyre::Result { - // the duration is obtained from the last row let total_duration = rows.last().map(|row| row.time).ok_or_eyre("empty results")?; let blocks_processed = rows.len() as u64; let total_gas_used: u64 = rows.into_iter().map(|row| row.gas_used).sum(); - let total_gas_per_second = total_gas_used as f64 / total_duration.as_secs_f64(); - Ok(Self { total_gas_used, total_duration, total_gas_per_second, blocks_processed }) + Ok(Self { + total_gas_used, + total_duration, + execution_duration: total_duration, + blocks_processed, + }) } - /// Return the total gigagas per second. + /// Create a new [`TotalGasOutput`] from gas rows and combined results. + /// + /// - `rows`: Used for total gas and wall-clock duration + /// - `combined_results`: Used for execution-only duration (sum of `total_latency`) + pub(crate) fn with_combined_results( + rows: Vec, + combined_results: &[CombinedResult], + ) -> eyre::Result { + let total_duration = rows.last().map(|row| row.time).ok_or_eyre("empty results")?; + let blocks_processed = rows.len() as u64; + let total_gas_used: u64 = rows.into_iter().map(|row| row.gas_used).sum(); + + // Sum execution-only time from combined results + let execution_duration: Duration = combined_results.iter().map(|r| r.total_latency).sum(); + + Ok(Self { total_gas_used, total_duration, execution_duration, blocks_processed }) + } + + /// Return the total gigagas per second based on wall-clock time. pub(crate) fn total_gigagas_per_second(&self) -> f64 { - self.total_gas_per_second / GIGAGAS as f64 + self.total_gas_used as f64 / self.total_duration.as_secs_f64() / GIGAGAS as f64 + } + + /// Return the execution-only gigagas per second (excludes wait times). + pub(crate) fn execution_gigagas_per_second(&self) -> f64 { + self.total_gas_used as f64 / self.execution_duration.as_secs_f64() / GIGAGAS as f64 } } @@ -192,8 +221,10 @@ impl TotalGasOutput { pub(crate) fn write_benchmark_results( output_dir: &Path, gas_results: &[TotalGasRow], - combined_results: Vec, + combined_results: &[CombinedResult], ) -> eyre::Result<()> { + fs::create_dir_all(output_dir)?; + let output_path = output_dir.join(COMBINED_OUTPUT_SUFFIX); info!("Writing engine api call latency output to file: {:?}", output_path); let mut writer = Writer::from_path(&output_path)?; diff --git a/bin/reth-bench/src/bench/persistence_waiter.rs b/bin/reth-bench/src/bench/persistence_waiter.rs new file mode 100644 index 0000000000..4302204056 --- /dev/null +++ b/bin/reth-bench/src/bench/persistence_waiter.rs @@ -0,0 +1,267 @@ +//! Persistence waiting utilities for benchmarks. +//! +//! Provides waiting behavior to control benchmark pacing: +//! - **Fixed duration waits**: Sleep for a fixed time between blocks +//! - **Persistence-based waits**: Wait for blocks to be persisted using +//! `reth_subscribePersistedBlock` subscription + +use alloy_eips::BlockNumHash; +use alloy_network::Ethereum; +use alloy_provider::{Provider, RootProvider}; +use alloy_pubsub::SubscriptionStream; +use alloy_rpc_client::RpcClient; +use alloy_transport_ws::WsConnect; +use eyre::Context; +use futures::StreamExt; +use std::time::Duration; +use tracing::{debug, info}; +use url::Url; + +/// Default timeout for waiting on persistence. +pub(crate) const PERSISTENCE_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(60); + +/// Converts an engine API URL to the default RPC websocket URL. +/// +/// Transformations: +/// - `http` → `ws` +/// - `https` → `wss` +/// - `ws` / `wss` keep their scheme +/// - Port is always set to `8546`, reth's default RPC websocket port. +/// +/// This is used when we only know the engine API URL (typically `:8551`) but +/// need to connect to the node's WS RPC endpoint for persistence events. +pub(crate) fn engine_url_to_ws_url(engine_url: &str) -> eyre::Result { + let url: Url = engine_url + .parse() + .wrap_err_with(|| format!("Failed to parse engine RPC URL: {engine_url}"))?; + + let mut ws_url = url.clone(); + + match ws_url.scheme() { + "http" => ws_url + .set_scheme("ws") + .map_err(|_| eyre::eyre!("Failed to set WS scheme for URL: {url}"))?, + "https" => ws_url + .set_scheme("wss") + .map_err(|_| eyre::eyre!("Failed to set WSS scheme for URL: {url}"))?, + "ws" | "wss" => {} + scheme => { + return Err(eyre::eyre!( + "Unsupported URL scheme '{scheme}' for URL: {url}. Expected http, https, ws, or wss." + )) + } + } + + ws_url.set_port(Some(8546)).map_err(|_| eyre::eyre!("Failed to set port for URL: {url}"))?; + + Ok(ws_url) +} + +/// Waits until the persistence subscription reports that `target` has been persisted. +/// +/// Consumes subscription events until `last_persisted >= target`, or returns an error if: +/// - the subscription stream ends unexpectedly, or +/// - `timeout` elapses before `target` is observed. +async fn wait_for_persistence( + stream: &mut SubscriptionStream, + target: u64, + last_persisted: &mut u64, + timeout: Duration, +) -> eyre::Result<()> { + tokio::time::timeout(timeout, async { + while *last_persisted < target { + match stream.next().await { + Some(persisted) => { + *last_persisted = persisted.number; + debug!( + target: "reth-bench", + persisted_block = ?last_persisted, + "Received persistence notification" + ); + } + None => { + return Err(eyre::eyre!("Persistence subscription closed unexpectedly")); + } + } + } + Ok(()) + }) + .await + .map_err(|_| { + eyre::eyre!( + "Persistence timeout: target block {} not persisted within {:?}. Last persisted: {}", + target, + timeout, + last_persisted + ) + })? +} + +/// Wrapper that keeps both the subscription stream and the underlying provider alive. +/// The provider must be kept alive for the subscription to continue receiving events. +pub(crate) struct PersistenceSubscription { + _provider: RootProvider, + stream: SubscriptionStream, +} + +impl PersistenceSubscription { + const fn new( + provider: RootProvider, + stream: SubscriptionStream, + ) -> Self { + Self { _provider: provider, stream } + } + + const fn stream_mut(&mut self) -> &mut SubscriptionStream { + &mut self.stream + } +} + +/// Establishes a websocket connection and subscribes to `reth_subscribePersistedBlock`. +pub(crate) async fn setup_persistence_subscription( + ws_url: Url, +) -> eyre::Result { + info!("Connecting to WebSocket at {} for persistence subscription", ws_url); + + let ws_connect = WsConnect::new(ws_url.to_string()); + let client = RpcClient::connect_pubsub(ws_connect) + .await + .wrap_err("Failed to connect to WebSocket RPC endpoint")?; + let provider: RootProvider = RootProvider::new(client); + + let subscription = provider + .subscribe_to::("reth_subscribePersistedBlock") + .await + .wrap_err("Failed to subscribe to persistence notifications")?; + + info!("Subscribed to persistence notifications"); + + Ok(PersistenceSubscription::new(provider, subscription.into_stream())) +} + +/// Encapsulates the block waiting logic. +/// +/// Provides a simple `on_block()` interface that handles both: +/// - Fixed duration waits (when `wait_time` is set) +/// - Persistence-based waits (when `subscription` is set) +/// +/// For persistence mode, waits after every `(threshold + 1)` blocks. +pub(crate) struct PersistenceWaiter { + wait_time: Option, + subscription: Option, + blocks_sent: u64, + last_persisted: u64, + threshold: u64, + timeout: Duration, +} + +impl PersistenceWaiter { + pub(crate) const fn with_duration(wait_time: Duration) -> Self { + Self { + wait_time: Some(wait_time), + subscription: None, + blocks_sent: 0, + last_persisted: 0, + threshold: 0, + timeout: Duration::ZERO, + } + } + + pub(crate) const fn with_subscription( + subscription: PersistenceSubscription, + threshold: u64, + timeout: Duration, + ) -> Self { + Self { + wait_time: None, + subscription: Some(subscription), + blocks_sent: 0, + last_persisted: 0, + threshold, + timeout, + } + } + + /// Called once per block. Waits based on the configured mode. + #[allow(clippy::manual_is_multiple_of)] + pub(crate) async fn on_block(&mut self, block_number: u64) -> eyre::Result<()> { + if let Some(wait_time) = self.wait_time { + tokio::time::sleep(wait_time).await; + return Ok(()); + } + + let Some(ref mut subscription) = self.subscription else { + return Ok(()); + }; + + self.blocks_sent += 1; + + if self.blocks_sent % (self.threshold + 1) == 0 { + debug!( + target: "reth-bench", + target_block = ?block_number, + last_persisted = self.last_persisted, + blocks_sent = self.blocks_sent, + "Waiting for persistence" + ); + + wait_for_persistence( + subscription.stream_mut(), + block_number, + &mut self.last_persisted, + self.timeout, + ) + .await?; + + debug!( + target: "reth-bench", + persisted = self.last_persisted, + "Persistence caught up" + ); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Instant; + + #[test] + fn test_engine_url_to_ws_url() { + // http -> ws, always uses port 8546 + let result = engine_url_to_ws_url("http://localhost:8551").unwrap(); + assert_eq!(result.as_str(), "ws://localhost:8546/"); + + // https -> wss + let result = engine_url_to_ws_url("https://localhost:8551").unwrap(); + assert_eq!(result.as_str(), "wss://localhost:8546/"); + + // Custom engine port still maps to 8546 + let result = engine_url_to_ws_url("http://localhost:9551").unwrap(); + assert_eq!(result.port(), Some(8546)); + + // Already ws passthrough + let result = engine_url_to_ws_url("ws://localhost:8546").unwrap(); + assert_eq!(result.scheme(), "ws"); + + // Invalid inputs + assert!(engine_url_to_ws_url("ftp://localhost:8551").is_err()); + assert!(engine_url_to_ws_url("not a valid url").is_err()); + } + + #[tokio::test] + async fn test_waiter_with_duration() { + let mut waiter = PersistenceWaiter::with_duration(Duration::from_millis(1)); + + let start = Instant::now(); + waiter.on_block(1).await.unwrap(); + waiter.on_block(2).await.unwrap(); + waiter.on_block(3).await.unwrap(); + + // Should have waited ~3ms total + assert!(start.elapsed() >= Duration::from_millis(3)); + } +} diff --git a/bin/reth-bench/src/bench/replay_payloads.rs b/bin/reth-bench/src/bench/replay_payloads.rs index e6d388d9ef..cf1b82f5be 100644 --- a/bin/reth-bench/src/bench/replay_payloads.rs +++ b/bin/reth-bench/src/bench/replay_payloads.rs @@ -2,10 +2,27 @@ //! //! This command reads `ExecutionPayloadEnvelopeV4` files from a directory and replays them //! in sequence using `newPayload` followed by `forkchoiceUpdated`. +//! +//! Supports configurable waiting behavior: +//! - **`--wait-time`**: Fixed sleep interval between blocks. +//! - **`--wait-for-persistence`**: Waits for every Nth block to be persisted using the +//! `reth_subscribePersistedBlock` subscription, where N matches the engine's persistence +//! threshold. This ensures the benchmark doesn't outpace persistence. +//! +//! Both options can be used together or independently. use crate::{ authenticated_transport::AuthenticatedTransportConnect, - bench::output::GasRampPayloadFile, + bench::{ + output::{ + write_benchmark_results, CombinedResult, GasRampPayloadFile, NewPayloadResult, + TotalGasOutput, TotalGasRow, + }, + persistence_waiter::{ + engine_url_to_ws_url, setup_persistence_subscription, PersistenceWaiter, + PERSISTENCE_CHECKPOINT_TIMEOUT, + }, + }, valid_payload::{call_forkchoice_updated, call_new_payload}, }; use alloy_primitives::B256; @@ -14,11 +31,16 @@ use alloy_rpc_client::ClientBuilder; use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV4, ForkchoiceState, JwtSecret}; use clap::Parser; use eyre::Context; -use reqwest::Url; +use humantime::parse_duration; use reth_cli_runner::CliContext; +use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD; use reth_node_api::EngineApiMessageVersion; -use std::path::PathBuf; +use std::{ + path::PathBuf, + time::{Duration, Instant}, +}; use tracing::{debug, info}; +use url::Url; /// `reth bench replay-payloads` command /// @@ -51,6 +73,42 @@ pub struct Command { /// These are replayed before the main payloads to warm up the gas limit. #[arg(long, value_name = "GAS_RAMP_DIR")] gas_ramp_dir: Option, + + /// Optional output directory for benchmark results (CSV files). + #[arg(long, value_name = "OUTPUT")] + output: Option, + + /// How long to wait after a forkchoice update before sending the next payload. + #[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)] + wait_time: Option, + + /// Wait for blocks to be persisted before sending the next batch. + /// + /// When enabled, waits for every Nth block to be persisted using the + /// `reth_subscribePersistedBlock` subscription. This ensures the benchmark + /// doesn't outpace persistence. + /// + /// The subscription uses the regular RPC websocket endpoint (no JWT required). + #[arg(long, default_value = "false", verbatim_doc_comment)] + wait_for_persistence: bool, + + /// Engine persistence threshold used for deciding when to wait for persistence. + /// + /// The benchmark waits after every `(threshold + 1)` blocks. By default this + /// matches the engine's `DEFAULT_PERSISTENCE_THRESHOLD` (2), so waits occur + /// at blocks 3, 6, 9, etc. + #[arg( + long = "persistence-threshold", + value_name = "PERSISTENCE_THRESHOLD", + default_value_t = DEFAULT_PERSISTENCE_THRESHOLD, + verbatim_doc_comment + )] + persistence_threshold: u64, + + /// Optional `WebSocket` RPC URL for persistence subscription. + /// If not provided, derives from engine RPC URL by changing scheme to ws and port to 8546. + #[arg(long, value_name = "WS_RPC_URL", verbatim_doc_comment)] + ws_rpc_url: Option, } /// A loaded payload ready for execution. @@ -78,6 +136,33 @@ impl Command { pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> { info!(payload_dir = %self.payload_dir.display(), "Replaying payloads"); + // Log mode configuration + if let Some(duration) = self.wait_time { + info!("Using wait-time mode with {}ms delay between blocks", duration.as_millis()); + } + if self.wait_for_persistence { + info!( + "Persistence waiting enabled (waits after every {} blocks to match engine gap > {} behavior)", + self.persistence_threshold + 1, + self.persistence_threshold + ); + } + + // Set up waiter based on configured options (duration takes precedence) + let mut waiter = match (self.wait_time, self.wait_for_persistence) { + (Some(duration), _) => Some(PersistenceWaiter::with_duration(duration)), + (None, true) => { + let ws_url = self.derive_ws_rpc_url()?; + let sub = setup_persistence_subscription(ws_url).await?; + Some(PersistenceWaiter::with_subscription( + sub, + self.persistence_threshold, + PERSISTENCE_CHECKPOINT_TIMEOUT, + )) + } + (None, false) => None, + }; + // Set up authenticated engine provider let jwt = std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?; @@ -144,6 +229,11 @@ impl Command { call_forkchoice_updated(&auth_provider, payload.version, fcu_state, None).await?; info!(gas_ramp_payload = i + 1, "Gas ramp payload executed successfully"); + + if let Some(w) = &mut waiter { + w.on_block(payload.block_number).await?; + } + parent_hash = payload.file.block_hash; } @@ -151,22 +241,112 @@ impl Command { info!(count = gas_ramp_payloads.len(), "All gas ramp payloads replayed"); } + let mut results = Vec::new(); + let total_benchmark_duration = Instant::now(); + for (i, payload) in payloads.iter().enumerate() { - info!( + let envelope = &payload.envelope; + let block_hash = payload.block_hash; + let execution_payload = &envelope.envelope_inner.execution_payload; + let inner_payload = &execution_payload.payload_inner.payload_inner; + + let gas_used = inner_payload.gas_used; + let gas_limit = inner_payload.gas_limit; + let block_number = inner_payload.block_number; + let transaction_count = + execution_payload.payload_inner.payload_inner.transactions.len() as u64; + + debug!( payload = i + 1, total = payloads.len(), index = payload.index, - block_hash = %payload.block_hash, + block_hash = %block_hash, "Executing payload (newPayload + FCU)" ); - self.execute_payload_v4(&auth_provider, &payload.envelope, parent_hash).await?; + let start = Instant::now(); - info!(payload = i + 1, "Payload executed successfully"); - parent_hash = payload.block_hash; + debug!( + method = "engine_newPayloadV4", + block_hash = %block_hash, + "Sending newPayload" + ); + + let status = auth_provider + .new_payload_v4( + execution_payload.clone(), + vec![], + B256::ZERO, + envelope.execution_requests.to_vec(), + ) + .await?; + + let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() }; + + if !status.is_valid() { + return Err(eyre::eyre!("Payload rejected: {:?}", status)); + } + + let fcu_state = ForkchoiceState { + head_block_hash: block_hash, + safe_block_hash: parent_hash, + finalized_block_hash: parent_hash, + }; + + debug!(method = "engine_forkchoiceUpdatedV3", ?fcu_state, "Sending forkchoiceUpdated"); + + let fcu_result = auth_provider.fork_choice_updated_v3(fcu_state, None).await?; + + let total_latency = start.elapsed(); + let fcu_latency = total_latency - new_payload_result.latency; + + let combined_result = CombinedResult { + block_number, + gas_limit, + transaction_count, + new_payload_result, + fcu_latency, + total_latency, + }; + + let current_duration = total_benchmark_duration.elapsed(); + info!(%combined_result); + + if let Some(w) = &mut waiter { + w.on_block(block_number).await?; + } + + let gas_row = + TotalGasRow { block_number, transaction_count, gas_used, time: current_duration }; + results.push((gas_row, combined_result)); + + debug!(?status, ?fcu_result, "Payload executed successfully"); + parent_hash = block_hash; } - info!(count = payloads.len(), "All payloads replayed successfully"); + // Drop waiter - we don't need to wait for final blocks to persist + // since the benchmark goal is measuring Ggas/s of newPayload/FCU, not persistence. + drop(waiter); + + let (gas_output_results, combined_results): (Vec, Vec) = + results.into_iter().unzip(); + + if let Some(ref path) = self.output { + write_benchmark_results(path, &gas_output_results, &combined_results)?; + } + + let gas_output = + TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?; + info!( + total_gas_used = gas_output.total_gas_used, + total_duration = ?gas_output.total_duration, + execution_duration = ?gas_output.execution_duration, + blocks_processed = gas_output.blocks_processed, + wall_clock_ggas_per_second = format_args!("{:.4}", gas_output.total_gigagas_per_second()), + execution_ggas_per_second = format_args!("{:.4}", gas_output.execution_gigagas_per_second()), + "Benchmark complete" + ); + Ok(()) } @@ -285,48 +465,32 @@ impl Command { Ok(payloads) } - async fn execute_payload_v4( - &self, - provider: &RootProvider, - envelope: &ExecutionPayloadEnvelopeV4, - parent_hash: B256, - ) -> eyre::Result<()> { - let block_hash = - envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash; - - debug!( - method = "engine_newPayloadV4", - block_hash = %block_hash, - "Sending newPayload" - ); - - let status = provider - .new_payload_v4( - envelope.envelope_inner.execution_payload.clone(), - vec![], - B256::ZERO, - envelope.execution_requests.to_vec(), - ) - .await?; - - info!(?status, "newPayloadV4 response"); - - if !status.is_valid() { - return Err(eyre::eyre!("Payload rejected: {:?}", status)); + /// Returns the websocket RPC URL used for the persistence subscription. + /// + /// Preference: + /// - If `--ws-rpc-url` is provided, use it directly. + /// - Otherwise, derive a WS RPC URL from `--engine-rpc-url`. + /// + /// The persistence subscription endpoint (`reth_subscribePersistedBlock`) is exposed on + /// the regular RPC server (WS port, usually 8546), not on the engine API port (usually 8551). + /// Since we only have the engine URL by default, we convert the scheme + /// (http→ws, https→wss) and force the port to 8546. + fn derive_ws_rpc_url(&self) -> eyre::Result { + if let Some(ref ws_url) = self.ws_rpc_url { + let parsed: Url = ws_url + .parse() + .wrap_err_with(|| format!("Failed to parse WebSocket RPC URL: {ws_url}"))?; + info!(target: "reth-bench", ws_url = %parsed, "Using provided WebSocket RPC URL"); + Ok(parsed) + } else { + let derived = engine_url_to_ws_url(&self.engine_rpc_url)?; + debug!( + target: "reth-bench", + engine_url = %self.engine_rpc_url, + %derived, + "Derived WebSocket RPC URL from engine RPC URL" + ); + Ok(derived) } - - let fcu_state = ForkchoiceState { - head_block_hash: block_hash, - safe_block_hash: parent_hash, - finalized_block_hash: parent_hash, - }; - - debug!(method = "engine_forkchoiceUpdatedV3", ?fcu_state, "Sending forkchoiceUpdated"); - - let fcu_result = provider.fork_choice_updated_v3(fcu_state, None).await?; - - info!(?fcu_result, "forkchoiceUpdatedV3 response"); - - Ok(()) } }