diff --git a/bin/reth-bench/src/bench/new_payload_fcu.rs b/bin/reth-bench/src/bench/new_payload_fcu.rs index e92556765b..a8b17df14a 100644 --- a/bin/reth-bench/src/bench/new_payload_fcu.rs +++ b/bin/reth-bench/src/bench/new_payload_fcu.rs @@ -16,7 +16,7 @@ use crate::{ write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow, }, persistence_waiter::{ - engine_url_to_ws_url, setup_persistence_subscription, PersistenceWaiter, + derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter, PERSISTENCE_CHECKPOINT_TIMEOUT, }, }, @@ -32,7 +32,6 @@ use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD; use reth_node_core::args::BenchmarkArgs; use std::time::{Duration, Instant}; use tracing::{debug, info}; -use url::Url; /// `reth benchmark new-payload-fcu` command #[derive(Debug, Parser)] @@ -101,7 +100,10 @@ impl Command { let mut waiter = match (self.wait_time, self.wait_for_persistence) { (Some(duration), _) => Some(PersistenceWaiter::with_duration(duration)), (None, true) => { - let ws_url = self.derive_ws_rpc_url()?; + let ws_url = derive_ws_rpc_url( + self.benchmark.ws_rpc_url.as_deref(), + &self.benchmark.engine_rpc_url, + )?; let sub = setup_persistence_subscription(ws_url).await?; Some(PersistenceWaiter::with_subscription( sub, @@ -260,33 +262,4 @@ impl Command { Ok(()) } - - /// Returns the websocket RPC URL used for the persistence subscription. - /// - /// Preference: - /// - If `--ws-rpc-url` is provided, use it directly. - /// - Otherwise, derive a WS RPC URL from `--engine-rpc-url`. - /// - /// The persistence subscription endpoint (`reth_subscribePersistedBlock`) is exposed on - /// the regular RPC server (WS port, usually 8546), not on the engine API port (usually 8551). - /// Since `BenchmarkArgs` only has the engine URL by default, we convert the scheme - /// (http→ws, https→wss) and force the port to 8546. - fn derive_ws_rpc_url(&self) -> eyre::Result { - if let Some(ref ws_url) = self.benchmark.ws_rpc_url { - let parsed: Url = ws_url - .parse() - .wrap_err_with(|| format!("Failed to parse WebSocket RPC URL: {ws_url}"))?; - info!(target: "reth-bench", ws_url = %parsed, "Using provided WebSocket RPC URL"); - Ok(parsed) - } else { - let derived = engine_url_to_ws_url(&self.benchmark.engine_rpc_url)?; - debug!( - target: "reth-bench", - engine_url = %self.benchmark.engine_rpc_url, - %derived, - "Derived WebSocket RPC URL from engine RPC URL" - ); - Ok(derived) - } - } } diff --git a/bin/reth-bench/src/bench/persistence_waiter.rs b/bin/reth-bench/src/bench/persistence_waiter.rs index 4302204056..9b50369c03 100644 --- a/bin/reth-bench/src/bench/persistence_waiter.rs +++ b/bin/reth-bench/src/bench/persistence_waiter.rs @@ -15,11 +15,46 @@ use eyre::Context; use futures::StreamExt; use std::time::Duration; use tracing::{debug, info}; + +/// Default `WebSocket` RPC port for reth. +const DEFAULT_WS_RPC_PORT: u16 = 8546; use url::Url; /// Default timeout for waiting on persistence. pub(crate) const PERSISTENCE_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(60); +/// Returns the websocket RPC URL used for the persistence subscription. +/// +/// Preference: +/// - If `ws_rpc_url` is provided, use it directly. +/// - Otherwise, derive a WS RPC URL from `engine_rpc_url`. +/// +/// The persistence subscription endpoint (`reth_subscribePersistedBlock`) is exposed on +/// the regular RPC server (WS port, usually 8546), not on the engine API port (usually 8551). +/// Since we may only have the engine URL by default, we convert the scheme +/// (http→ws, https→wss) and force the port to 8546. +pub(crate) fn derive_ws_rpc_url( + ws_rpc_url: Option<&str>, + engine_rpc_url: &str, +) -> eyre::Result { + if let Some(ws_url) = ws_rpc_url { + let parsed: Url = ws_url + .parse() + .wrap_err_with(|| format!("Failed to parse WebSocket RPC URL: {ws_url}"))?; + info!(target: "reth-bench", ws_url = %parsed, "Using provided WebSocket RPC URL"); + Ok(parsed) + } else { + let derived = engine_url_to_ws_url(engine_rpc_url)?; + debug!( + target: "reth-bench", + engine_url = %engine_rpc_url, + %derived, + "Derived WebSocket RPC URL from engine RPC URL" + ); + Ok(derived) + } +} + /// Converts an engine API URL to the default RPC websocket URL. /// /// Transformations: @@ -30,7 +65,7 @@ pub(crate) const PERSISTENCE_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs( /// /// This is used when we only know the engine API URL (typically `:8551`) but /// need to connect to the node's WS RPC endpoint for persistence events. -pub(crate) fn engine_url_to_ws_url(engine_url: &str) -> eyre::Result { +fn engine_url_to_ws_url(engine_url: &str) -> eyre::Result { let url: Url = engine_url .parse() .wrap_err_with(|| format!("Failed to parse engine RPC URL: {engine_url}"))?; @@ -52,7 +87,9 @@ pub(crate) fn engine_url_to_ws_url(engine_url: &str) -> eyre::Result { } } - ws_url.set_port(Some(8546)).map_err(|_| eyre::eyre!("Failed to set port for URL: {url}"))?; + ws_url + .set_port(Some(DEFAULT_WS_RPC_PORT)) + .map_err(|_| eyre::eyre!("Failed to set port for URL: {url}"))?; Ok(ws_url) } diff --git a/bin/reth-bench/src/bench/replay_payloads.rs b/bin/reth-bench/src/bench/replay_payloads.rs index cf1b82f5be..20181558b8 100644 --- a/bin/reth-bench/src/bench/replay_payloads.rs +++ b/bin/reth-bench/src/bench/replay_payloads.rs @@ -19,7 +19,7 @@ use crate::{ TotalGasOutput, TotalGasRow, }, persistence_waiter::{ - engine_url_to_ws_url, setup_persistence_subscription, PersistenceWaiter, + derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter, PERSISTENCE_CHECKPOINT_TIMEOUT, }, }, @@ -152,7 +152,7 @@ impl Command { let mut waiter = match (self.wait_time, self.wait_for_persistence) { (Some(duration), _) => Some(PersistenceWaiter::with_duration(duration)), (None, true) => { - let ws_url = self.derive_ws_rpc_url()?; + let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?; let sub = setup_persistence_subscription(ws_url).await?; Some(PersistenceWaiter::with_subscription( sub, @@ -464,33 +464,4 @@ impl Command { Ok(payloads) } - - /// Returns the websocket RPC URL used for the persistence subscription. - /// - /// Preference: - /// - If `--ws-rpc-url` is provided, use it directly. - /// - Otherwise, derive a WS RPC URL from `--engine-rpc-url`. - /// - /// The persistence subscription endpoint (`reth_subscribePersistedBlock`) is exposed on - /// the regular RPC server (WS port, usually 8546), not on the engine API port (usually 8551). - /// Since we only have the engine URL by default, we convert the scheme - /// (http→ws, https→wss) and force the port to 8546. - fn derive_ws_rpc_url(&self) -> eyre::Result { - if let Some(ref ws_url) = self.ws_rpc_url { - let parsed: Url = ws_url - .parse() - .wrap_err_with(|| format!("Failed to parse WebSocket RPC URL: {ws_url}"))?; - info!(target: "reth-bench", ws_url = %parsed, "Using provided WebSocket RPC URL"); - Ok(parsed) - } else { - let derived = engine_url_to_ws_url(&self.engine_rpc_url)?; - debug!( - target: "reth-bench", - engine_url = %self.engine_rpc_url, - %derived, - "Derived WebSocket RPC URL from engine RPC URL" - ); - Ok(derived) - } - } }