diff --git a/bin/reth-bench-compare/src/benchmark.rs b/bin/reth-bench-compare/src/benchmark.rs index a6b0935dc1..8c6bf37a5a 100644 --- a/bin/reth-bench-compare/src/benchmark.rs +++ b/bin/reth-bench-compare/src/benchmark.rs @@ -186,10 +186,12 @@ impl BenchmarkRunner { &output_dir.to_string_lossy(), ]); - // Configure wait mode: wait-time takes precedence over persistence-based flow + // Configure wait mode: both can be used together + // When both are set: wait at least wait_time, and also wait for persistence if needed if let Some(ref wait_time) = self.wait_time { cmd.args(["--wait-time", wait_time]); - } else if self.wait_for_persistence { + } + if self.wait_for_persistence { cmd.arg("--wait-for-persistence"); // Add persistence threshold if specified diff --git a/bin/reth-bench-compare/src/cli.rs b/bin/reth-bench-compare/src/cli.rs index 4755368c5f..d176c0ece6 100644 --- a/bin/reth-bench-compare/src/cli.rs +++ b/bin/reth-bench-compare/src/cli.rs @@ -116,9 +116,9 @@ pub(crate) struct Args { /// Optional fixed delay between engine API calls (passed to reth-bench). /// - /// When set, reth-bench uses wait-time mode and disables persistence-based flow. - /// This flag remains for compatibility with older scripts. - #[arg(long, value_name = "DURATION", hide = true)] + /// Can be combined with `--wait-for-persistence`: when both are set, + /// waits at least this duration, and also waits for persistence if needed. + #[arg(long, value_name = "DURATION")] pub wait_time: Option, /// Wait for blocks to be persisted before sending the next batch (passed to reth-bench). @@ -126,6 +126,9 @@ pub(crate) struct Args { /// When enabled, waits for every Nth block to be persisted using the /// `reth_subscribePersistedBlock` subscription. This ensures the benchmark /// doesn't outpace persistence. + /// + /// Can be combined with `--wait-time`: when both are set, waits at least + /// wait-time, and also waits for persistence if the block hasn't been persisted yet. #[arg(long)] pub wait_for_persistence: bool, diff --git a/bin/reth-bench/src/bench/new_payload_fcu.rs b/bin/reth-bench/src/bench/new_payload_fcu.rs index a8b17df14a..17e4261ec2 100644 --- a/bin/reth-bench/src/bench/new_payload_fcu.rs +++ b/bin/reth-bench/src/bench/new_payload_fcu.rs @@ -96,9 +96,23 @@ impl Command { ); } - // Set up waiter based on configured options (duration takes precedence) + // Set up waiter based on configured options + // When both are set: wait at least wait_time, and also wait for persistence if needed let mut waiter = match (self.wait_time, self.wait_for_persistence) { - (Some(duration), _) => Some(PersistenceWaiter::with_duration(duration)), + (Some(duration), true) => { + let ws_url = derive_ws_rpc_url( + self.benchmark.ws_rpc_url.as_deref(), + &self.benchmark.engine_rpc_url, + )?; + let sub = setup_persistence_subscription(ws_url).await?; + Some(PersistenceWaiter::with_duration_and_subscription( + duration, + sub, + self.persistence_threshold, + PERSISTENCE_CHECKPOINT_TIMEOUT, + )) + } + (Some(duration), false) => Some(PersistenceWaiter::with_duration(duration)), (None, true) => { let ws_url = derive_ws_rpc_url( self.benchmark.ws_rpc_url.as_deref(), diff --git a/bin/reth-bench/src/bench/persistence_waiter.rs b/bin/reth-bench/src/bench/persistence_waiter.rs index 9b50369c03..df09d625b4 100644 --- a/bin/reth-bench/src/bench/persistence_waiter.rs +++ b/bin/reth-bench/src/bench/persistence_waiter.rs @@ -4,6 +4,8 @@ //! - **Fixed duration waits**: Sleep for a fixed time between blocks //! - **Persistence-based waits**: Wait for blocks to be persisted using //! `reth_subscribePersistedBlock` subscription +//! - **Combined mode**: Wait at least the fixed duration, and also wait for persistence if the +//! block hasn't been persisted yet (whichever takes longer) use alloy_eips::BlockNumHash; use alloy_network::Ethereum; @@ -219,14 +221,39 @@ impl PersistenceWaiter { } } + /// Creates a waiter that combines both duration and persistence waiting. + /// + /// Waits at least `wait_time` between blocks, and also waits for persistence + /// if the block hasn't been persisted yet (whichever takes longer). + pub(crate) const fn with_duration_and_subscription( + wait_time: Duration, + subscription: PersistenceSubscription, + threshold: u64, + timeout: Duration, + ) -> Self { + Self { + wait_time: Some(wait_time), + subscription: Some(subscription), + blocks_sent: 0, + last_persisted: 0, + threshold, + timeout, + } + } + /// Called once per block. Waits based on the configured mode. + /// + /// When both `wait_time` and `subscription` are set (combined mode): + /// - Always waits at least `wait_time` + /// - Additionally waits for persistence if we're at a persistence checkpoint #[allow(clippy::manual_is_multiple_of)] pub(crate) async fn on_block(&mut self, block_number: u64) -> eyre::Result<()> { + // Always wait for the fixed duration if configured if let Some(wait_time) = self.wait_time { tokio::time::sleep(wait_time).await; - return Ok(()); } + // Check persistence if subscription is configured let Some(ref mut subscription) = self.subscription else { return Ok(()); }; diff --git a/bin/reth-bench/src/bench/replay_payloads.rs b/bin/reth-bench/src/bench/replay_payloads.rs index 20181558b8..977017ef59 100644 --- a/bin/reth-bench/src/bench/replay_payloads.rs +++ b/bin/reth-bench/src/bench/replay_payloads.rs @@ -148,9 +148,20 @@ impl Command { ); } - // Set up waiter based on configured options (duration takes precedence) + // Set up waiter based on configured options + // When both are set: wait at least wait_time, and also wait for persistence if needed let mut waiter = match (self.wait_time, self.wait_for_persistence) { - (Some(duration), _) => Some(PersistenceWaiter::with_duration(duration)), + (Some(duration), true) => { + let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?; + let sub = setup_persistence_subscription(ws_url).await?; + Some(PersistenceWaiter::with_duration_and_subscription( + duration, + sub, + self.persistence_threshold, + PERSISTENCE_CHECKPOINT_TIMEOUT, + )) + } + (Some(duration), false) => Some(PersistenceWaiter::with_duration(duration)), (None, true) => { let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?; let sub = setup_persistence_subscription(ws_url).await?;