feat(reth-bench): support combined wait-time and wait-for-persistence modes (#21771)

Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Georgios Konstantopoulos
2026-02-04 03:29:08 -08:00
committed by GitHub
parent 32c08b7ddb
commit 4ae60f3302
5 changed files with 67 additions and 10 deletions

View File

@@ -186,10 +186,12 @@ impl BenchmarkRunner {
&output_dir.to_string_lossy(),
]);
// Configure wait mode: wait-time takes precedence over persistence-based flow
// Configure wait mode: both can be used together
// When both are set: wait at least wait_time, and also wait for persistence if needed
if let Some(ref wait_time) = self.wait_time {
cmd.args(["--wait-time", wait_time]);
} else if self.wait_for_persistence {
}
if self.wait_for_persistence {
cmd.arg("--wait-for-persistence");
// Add persistence threshold if specified

View File

@@ -116,9 +116,9 @@ pub(crate) struct Args {
/// Optional fixed delay between engine API calls (passed to reth-bench).
///
/// When set, reth-bench uses wait-time mode and disables persistence-based flow.
/// This flag remains for compatibility with older scripts.
#[arg(long, value_name = "DURATION", hide = true)]
/// Can be combined with `--wait-for-persistence`: when both are set,
/// waits at least this duration, and also waits for persistence if needed.
#[arg(long, value_name = "DURATION")]
pub wait_time: Option<String>,
/// Wait for blocks to be persisted before sending the next batch (passed to reth-bench).
@@ -126,6 +126,9 @@ pub(crate) struct Args {
/// When enabled, waits for every Nth block to be persisted using the
/// `reth_subscribePersistedBlock` subscription. This ensures the benchmark
/// doesn't outpace persistence.
///
/// Can be combined with `--wait-time`: when both are set, waits at least
/// wait-time, and also waits for persistence if the block hasn't been persisted yet.
#[arg(long)]
pub wait_for_persistence: bool,

View File

@@ -96,9 +96,23 @@ impl Command {
);
}
// Set up waiter based on configured options (duration takes precedence)
// Set up waiter based on configured options
// When both are set: wait at least wait_time, and also wait for persistence if needed
let mut waiter = match (self.wait_time, self.wait_for_persistence) {
(Some(duration), _) => Some(PersistenceWaiter::with_duration(duration)),
(Some(duration), true) => {
let ws_url = derive_ws_rpc_url(
self.benchmark.ws_rpc_url.as_deref(),
&self.benchmark.engine_rpc_url,
)?;
let sub = setup_persistence_subscription(ws_url).await?;
Some(PersistenceWaiter::with_duration_and_subscription(
duration,
sub,
self.persistence_threshold,
PERSISTENCE_CHECKPOINT_TIMEOUT,
))
}
(Some(duration), false) => Some(PersistenceWaiter::with_duration(duration)),
(None, true) => {
let ws_url = derive_ws_rpc_url(
self.benchmark.ws_rpc_url.as_deref(),

View File

@@ -4,6 +4,8 @@
//! - **Fixed duration waits**: Sleep for a fixed time between blocks
//! - **Persistence-based waits**: Wait for blocks to be persisted using
//! `reth_subscribePersistedBlock` subscription
//! - **Combined mode**: Wait at least the fixed duration, and also wait for persistence if the
//! block hasn't been persisted yet (whichever takes longer)
use alloy_eips::BlockNumHash;
use alloy_network::Ethereum;
@@ -219,14 +221,39 @@ impl PersistenceWaiter {
}
}
/// Creates a waiter that combines both duration and persistence waiting.
///
/// Waits at least `wait_time` between blocks, and also waits for persistence
/// if the block hasn't been persisted yet (whichever takes longer).
pub(crate) const fn with_duration_and_subscription(
wait_time: Duration,
subscription: PersistenceSubscription,
threshold: u64,
timeout: Duration,
) -> Self {
Self {
wait_time: Some(wait_time),
subscription: Some(subscription),
blocks_sent: 0,
last_persisted: 0,
threshold,
timeout,
}
}
/// Called once per block. Waits based on the configured mode.
///
/// When both `wait_time` and `subscription` are set (combined mode):
/// - Always waits at least `wait_time`
/// - Additionally waits for persistence if we're at a persistence checkpoint
#[allow(clippy::manual_is_multiple_of)]
pub(crate) async fn on_block(&mut self, block_number: u64) -> eyre::Result<()> {
// Always wait for the fixed duration if configured
if let Some(wait_time) = self.wait_time {
tokio::time::sleep(wait_time).await;
return Ok(());
}
// Check persistence if subscription is configured
let Some(ref mut subscription) = self.subscription else {
return Ok(());
};

View File

@@ -148,9 +148,20 @@ impl Command {
);
}
// Set up waiter based on configured options (duration takes precedence)
// Set up waiter based on configured options
// When both are set: wait at least wait_time, and also wait for persistence if needed
let mut waiter = match (self.wait_time, self.wait_for_persistence) {
(Some(duration), _) => Some(PersistenceWaiter::with_duration(duration)),
(Some(duration), true) => {
let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
let sub = setup_persistence_subscription(ws_url).await?;
Some(PersistenceWaiter::with_duration_and_subscription(
duration,
sub,
self.persistence_threshold,
PERSISTENCE_CHECKPOINT_TIMEOUT,
))
}
(Some(duration), false) => Some(PersistenceWaiter::with_duration(duration)),
(None, true) => {
let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
let sub = setup_persistence_subscription(ws_url).await?;