mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
feat(reth-bench): add reporting and wait options to replay-payloads (#21537)
This commit is contained in:
@@ -15,6 +15,7 @@ pub use generate_big_block::{
|
||||
mod new_payload_fcu;
|
||||
mod new_payload_only;
|
||||
mod output;
|
||||
mod persistence_waiter;
|
||||
mod replay_payloads;
|
||||
mod send_invalid_payload;
|
||||
mod send_payload;
|
||||
|
||||
@@ -15,19 +15,17 @@ use crate::{
|
||||
output::{
|
||||
write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
|
||||
},
|
||||
persistence_waiter::{
|
||||
engine_url_to_ws_url, setup_persistence_subscription, PersistenceWaiter,
|
||||
PERSISTENCE_CHECKPOINT_TIMEOUT,
|
||||
},
|
||||
},
|
||||
valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload},
|
||||
};
|
||||
use alloy_eips::BlockNumHash;
|
||||
use alloy_network::Ethereum;
|
||||
use alloy_provider::{Provider, RootProvider};
|
||||
use alloy_pubsub::SubscriptionStream;
|
||||
use alloy_rpc_client::RpcClient;
|
||||
use alloy_provider::Provider;
|
||||
use alloy_rpc_types_engine::ForkchoiceState;
|
||||
use alloy_transport_ws::WsConnect;
|
||||
use clap::Parser;
|
||||
use eyre::{Context, OptionExt};
|
||||
use futures::StreamExt;
|
||||
use humantime::parse_duration;
|
||||
use reth_cli_runner::CliContext;
|
||||
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
|
||||
@@ -36,8 +34,6 @@ use std::time::{Duration, Instant};
|
||||
use tracing::{debug, info};
|
||||
use url::Url;
|
||||
|
||||
const PERSISTENCE_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
/// `reth benchmark new-payload-fcu` command
|
||||
#[derive(Debug, Parser)]
|
||||
pub struct Command {
|
||||
@@ -105,7 +101,8 @@ impl Command {
|
||||
let mut waiter = match (self.wait_time, self.wait_for_persistence) {
|
||||
(Some(duration), _) => Some(PersistenceWaiter::with_duration(duration)),
|
||||
(None, true) => {
|
||||
let sub = self.setup_persistence_subscription().await?;
|
||||
let ws_url = self.derive_ws_rpc_url()?;
|
||||
let sub = setup_persistence_subscription(ws_url).await?;
|
||||
Some(PersistenceWaiter::with_subscription(
|
||||
sub,
|
||||
self.persistence_threshold,
|
||||
@@ -245,17 +242,20 @@ impl Command {
|
||||
results.into_iter().unzip();
|
||||
|
||||
if let Some(ref path) = self.benchmark.output {
|
||||
write_benchmark_results(path, &gas_output_results, combined_results)?;
|
||||
write_benchmark_results(path, &gas_output_results, &combined_results)?;
|
||||
}
|
||||
|
||||
let gas_output = TotalGasOutput::new(gas_output_results)?;
|
||||
let gas_output =
|
||||
TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;
|
||||
|
||||
info!(
|
||||
total_duration=?gas_output.total_duration,
|
||||
total_gas_used=?gas_output.total_gas_used,
|
||||
blocks_processed=?gas_output.blocks_processed,
|
||||
"Total Ggas/s: {:.4}",
|
||||
gas_output.total_gigagas_per_second()
|
||||
total_gas_used = gas_output.total_gas_used,
|
||||
total_duration = ?gas_output.total_duration,
|
||||
execution_duration = ?gas_output.execution_duration,
|
||||
blocks_processed = gas_output.blocks_processed,
|
||||
wall_clock_ggas_per_second = format_args!("{:.4}", gas_output.total_gigagas_per_second()),
|
||||
execution_ggas_per_second = format_args!("{:.4}", gas_output.execution_gigagas_per_second()),
|
||||
"Benchmark complete"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
@@ -289,249 +289,4 @@ impl Command {
|
||||
Ok(derived)
|
||||
}
|
||||
}
|
||||
|
||||
/// Establishes a websocket connection and subscribes to `reth_subscribePersistedBlock`.
|
||||
async fn setup_persistence_subscription(&self) -> eyre::Result<PersistenceSubscription> {
|
||||
let ws_url = self.derive_ws_rpc_url()?;
|
||||
|
||||
info!("Connecting to WebSocket at {} for persistence subscription", ws_url);
|
||||
|
||||
let ws_connect = WsConnect::new(ws_url.to_string());
|
||||
let client = RpcClient::connect_pubsub(ws_connect)
|
||||
.await
|
||||
.wrap_err("Failed to connect to WebSocket RPC endpoint")?;
|
||||
let provider: RootProvider<Ethereum> = RootProvider::new(client);
|
||||
|
||||
let subscription = provider
|
||||
.subscribe_to::<BlockNumHash>("reth_subscribePersistedBlock")
|
||||
.await
|
||||
.wrap_err("Failed to subscribe to persistence notifications")?;
|
||||
|
||||
info!("Subscribed to persistence notifications");
|
||||
|
||||
Ok(PersistenceSubscription::new(provider, subscription.into_stream()))
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts an engine API URL to the default RPC websocket URL.
|
||||
///
|
||||
/// Transformations:
|
||||
/// - `http` → `ws`
|
||||
/// - `https` → `wss`
|
||||
/// - `ws` / `wss` keep their scheme
|
||||
/// - Port is always set to `8546`, reth's default RPC websocket port.
|
||||
///
|
||||
/// This is used when we only know the engine API URL (typically `:8551`) but
|
||||
/// need to connect to the node's WS RPC endpoint for persistence events.
|
||||
fn engine_url_to_ws_url(engine_url: &str) -> eyre::Result<Url> {
|
||||
let url: Url = engine_url
|
||||
.parse()
|
||||
.wrap_err_with(|| format!("Failed to parse engine RPC URL: {engine_url}"))?;
|
||||
|
||||
let mut ws_url = url.clone();
|
||||
|
||||
match ws_url.scheme() {
|
||||
"http" => ws_url
|
||||
.set_scheme("ws")
|
||||
.map_err(|_| eyre::eyre!("Failed to set WS scheme for URL: {url}"))?,
|
||||
"https" => ws_url
|
||||
.set_scheme("wss")
|
||||
.map_err(|_| eyre::eyre!("Failed to set WSS scheme for URL: {url}"))?,
|
||||
"ws" | "wss" => {}
|
||||
scheme => {
|
||||
return Err(eyre::eyre!(
|
||||
"Unsupported URL scheme '{scheme}' for URL: {url}. Expected http, https, ws, or wss."
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
ws_url.set_port(Some(8546)).map_err(|_| eyre::eyre!("Failed to set port for URL: {url}"))?;
|
||||
|
||||
Ok(ws_url)
|
||||
}
|
||||
|
||||
/// Waits until the persistence subscription reports that `target` has been persisted.
|
||||
///
|
||||
/// Consumes subscription events until `last_persisted >= target`, or returns an error if:
|
||||
/// - the subscription stream ends unexpectedly, or
|
||||
/// - `timeout` elapses before `target` is observed.
|
||||
async fn wait_for_persistence(
|
||||
stream: &mut SubscriptionStream<BlockNumHash>,
|
||||
target: u64,
|
||||
last_persisted: &mut u64,
|
||||
timeout: Duration,
|
||||
) -> eyre::Result<()> {
|
||||
tokio::time::timeout(timeout, async {
|
||||
while *last_persisted < target {
|
||||
match stream.next().await {
|
||||
Some(persisted) => {
|
||||
*last_persisted = persisted.number;
|
||||
debug!(
|
||||
target: "reth-bench",
|
||||
persisted_block = ?last_persisted,
|
||||
"Received persistence notification"
|
||||
);
|
||||
}
|
||||
None => {
|
||||
return Err(eyre::eyre!("Persistence subscription closed unexpectedly"));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.map_err(|_| {
|
||||
eyre::eyre!(
|
||||
"Persistence timeout: target block {} not persisted within {:?}. Last persisted: {}",
|
||||
target,
|
||||
timeout,
|
||||
last_persisted
|
||||
)
|
||||
})?
|
||||
}
|
||||
|
||||
/// Wrapper that keeps both the subscription stream and the underlying provider alive.
|
||||
/// The provider must be kept alive for the subscription to continue receiving events.
|
||||
struct PersistenceSubscription {
|
||||
_provider: RootProvider<Ethereum>,
|
||||
stream: SubscriptionStream<BlockNumHash>,
|
||||
}
|
||||
|
||||
impl PersistenceSubscription {
|
||||
const fn new(
|
||||
provider: RootProvider<Ethereum>,
|
||||
stream: SubscriptionStream<BlockNumHash>,
|
||||
) -> Self {
|
||||
Self { _provider: provider, stream }
|
||||
}
|
||||
|
||||
const fn stream_mut(&mut self) -> &mut SubscriptionStream<BlockNumHash> {
|
||||
&mut self.stream
|
||||
}
|
||||
}
|
||||
|
||||
/// Encapsulates the block waiting logic.
|
||||
///
|
||||
/// Provides a simple `on_block()` interface that handles both:
|
||||
/// - Fixed duration waits (when `wait_time` is set)
|
||||
/// - Persistence-based waits (when `subscription` is set)
|
||||
///
|
||||
/// For persistence mode, waits after every `(threshold + 1)` blocks.
|
||||
struct PersistenceWaiter {
|
||||
wait_time: Option<Duration>,
|
||||
subscription: Option<PersistenceSubscription>,
|
||||
blocks_sent: u64,
|
||||
last_persisted: u64,
|
||||
threshold: u64,
|
||||
timeout: Duration,
|
||||
}
|
||||
|
||||
impl PersistenceWaiter {
|
||||
const fn with_duration(wait_time: Duration) -> Self {
|
||||
Self {
|
||||
wait_time: Some(wait_time),
|
||||
subscription: None,
|
||||
blocks_sent: 0,
|
||||
last_persisted: 0,
|
||||
threshold: 0,
|
||||
timeout: Duration::ZERO,
|
||||
}
|
||||
}
|
||||
|
||||
const fn with_subscription(
|
||||
subscription: PersistenceSubscription,
|
||||
threshold: u64,
|
||||
timeout: Duration,
|
||||
) -> Self {
|
||||
Self {
|
||||
wait_time: None,
|
||||
subscription: Some(subscription),
|
||||
blocks_sent: 0,
|
||||
last_persisted: 0,
|
||||
threshold,
|
||||
timeout,
|
||||
}
|
||||
}
|
||||
|
||||
/// Called once per block. Waits based on the configured mode.
|
||||
#[allow(clippy::manual_is_multiple_of)]
|
||||
async fn on_block(&mut self, block_number: u64) -> eyre::Result<()> {
|
||||
if let Some(wait_time) = self.wait_time {
|
||||
tokio::time::sleep(wait_time).await;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let Some(ref mut subscription) = self.subscription else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
self.blocks_sent += 1;
|
||||
|
||||
if self.blocks_sent % (self.threshold + 1) == 0 {
|
||||
debug!(
|
||||
target: "reth-bench",
|
||||
target_block = ?block_number,
|
||||
last_persisted = self.last_persisted,
|
||||
blocks_sent = self.blocks_sent,
|
||||
"Waiting for persistence"
|
||||
);
|
||||
|
||||
wait_for_persistence(
|
||||
subscription.stream_mut(),
|
||||
block_number,
|
||||
&mut self.last_persisted,
|
||||
self.timeout,
|
||||
)
|
||||
.await?;
|
||||
|
||||
debug!(
|
||||
target: "reth-bench",
|
||||
persisted = self.last_persisted,
|
||||
"Persistence caught up"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_engine_url_to_ws_url() {
|
||||
// http -> ws, always uses port 8546
|
||||
let result = engine_url_to_ws_url("http://localhost:8551").unwrap();
|
||||
assert_eq!(result.as_str(), "ws://localhost:8546/");
|
||||
|
||||
// https -> wss
|
||||
let result = engine_url_to_ws_url("https://localhost:8551").unwrap();
|
||||
assert_eq!(result.as_str(), "wss://localhost:8546/");
|
||||
|
||||
// Custom engine port still maps to 8546
|
||||
let result = engine_url_to_ws_url("http://localhost:9551").unwrap();
|
||||
assert_eq!(result.port(), Some(8546));
|
||||
|
||||
// Already ws passthrough
|
||||
let result = engine_url_to_ws_url("ws://localhost:8546").unwrap();
|
||||
assert_eq!(result.scheme(), "ws");
|
||||
|
||||
// Invalid inputs
|
||||
assert!(engine_url_to_ws_url("ftp://localhost:8551").is_err());
|
||||
assert!(engine_url_to_ws_url("not a valid url").is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_waiter_with_duration() {
|
||||
let mut waiter = PersistenceWaiter::with_duration(Duration::from_millis(1));
|
||||
|
||||
let start = Instant::now();
|
||||
waiter.on_block(1).await.unwrap();
|
||||
waiter.on_block(2).await.unwrap();
|
||||
waiter.on_block(3).await.unwrap();
|
||||
|
||||
// Should have waited ~3ms total
|
||||
assert!(start.elapsed() >= Duration::from_millis(3));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ use csv::Writer;
|
||||
use eyre::OptionExt;
|
||||
use reth_primitives_traits::constants::GIGAGAS;
|
||||
use serde::{ser::SerializeStruct, Deserialize, Serialize};
|
||||
use std::{path::Path, time::Duration};
|
||||
use std::{fs, path::Path, time::Duration};
|
||||
use tracing::info;
|
||||
|
||||
/// This is the suffix for gas output csv files.
|
||||
@@ -158,29 +158,58 @@ pub(crate) struct TotalGasRow {
|
||||
pub(crate) struct TotalGasOutput {
|
||||
/// The total gas used in the benchmark.
|
||||
pub(crate) total_gas_used: u64,
|
||||
/// The total duration of the benchmark.
|
||||
/// The total wall-clock duration of the benchmark (includes wait times).
|
||||
pub(crate) total_duration: Duration,
|
||||
/// The total gas used per second.
|
||||
pub(crate) total_gas_per_second: f64,
|
||||
/// The total execution-only duration (excludes wait times).
|
||||
pub(crate) execution_duration: Duration,
|
||||
/// The number of blocks processed.
|
||||
pub(crate) blocks_processed: u64,
|
||||
}
|
||||
|
||||
impl TotalGasOutput {
|
||||
/// Create a new [`TotalGasOutput`] from a list of [`TotalGasRow`].
|
||||
/// Create a new [`TotalGasOutput`] from gas rows only.
|
||||
///
|
||||
/// Use this when execution-only timing is not available (e.g., `new_payload_only`).
|
||||
/// `execution_duration` will equal `total_duration`.
|
||||
pub(crate) fn new(rows: Vec<TotalGasRow>) -> eyre::Result<Self> {
|
||||
// the duration is obtained from the last row
|
||||
let total_duration = rows.last().map(|row| row.time).ok_or_eyre("empty results")?;
|
||||
let blocks_processed = rows.len() as u64;
|
||||
let total_gas_used: u64 = rows.into_iter().map(|row| row.gas_used).sum();
|
||||
let total_gas_per_second = total_gas_used as f64 / total_duration.as_secs_f64();
|
||||
|
||||
Ok(Self { total_gas_used, total_duration, total_gas_per_second, blocks_processed })
|
||||
Ok(Self {
|
||||
total_gas_used,
|
||||
total_duration,
|
||||
execution_duration: total_duration,
|
||||
blocks_processed,
|
||||
})
|
||||
}
|
||||
|
||||
/// Return the total gigagas per second.
|
||||
/// Create a new [`TotalGasOutput`] from gas rows and combined results.
|
||||
///
|
||||
/// - `rows`: Used for total gas and wall-clock duration
|
||||
/// - `combined_results`: Used for execution-only duration (sum of `total_latency`)
|
||||
pub(crate) fn with_combined_results(
|
||||
rows: Vec<TotalGasRow>,
|
||||
combined_results: &[CombinedResult],
|
||||
) -> eyre::Result<Self> {
|
||||
let total_duration = rows.last().map(|row| row.time).ok_or_eyre("empty results")?;
|
||||
let blocks_processed = rows.len() as u64;
|
||||
let total_gas_used: u64 = rows.into_iter().map(|row| row.gas_used).sum();
|
||||
|
||||
// Sum execution-only time from combined results
|
||||
let execution_duration: Duration = combined_results.iter().map(|r| r.total_latency).sum();
|
||||
|
||||
Ok(Self { total_gas_used, total_duration, execution_duration, blocks_processed })
|
||||
}
|
||||
|
||||
/// Return the total gigagas per second based on wall-clock time.
|
||||
pub(crate) fn total_gigagas_per_second(&self) -> f64 {
|
||||
self.total_gas_per_second / GIGAGAS as f64
|
||||
self.total_gas_used as f64 / self.total_duration.as_secs_f64() / GIGAGAS as f64
|
||||
}
|
||||
|
||||
/// Return the execution-only gigagas per second (excludes wait times).
|
||||
pub(crate) fn execution_gigagas_per_second(&self) -> f64 {
|
||||
self.total_gas_used as f64 / self.execution_duration.as_secs_f64() / GIGAGAS as f64
|
||||
}
|
||||
}
|
||||
|
||||
@@ -192,8 +221,10 @@ impl TotalGasOutput {
|
||||
pub(crate) fn write_benchmark_results(
|
||||
output_dir: &Path,
|
||||
gas_results: &[TotalGasRow],
|
||||
combined_results: Vec<CombinedResult>,
|
||||
combined_results: &[CombinedResult],
|
||||
) -> eyre::Result<()> {
|
||||
fs::create_dir_all(output_dir)?;
|
||||
|
||||
let output_path = output_dir.join(COMBINED_OUTPUT_SUFFIX);
|
||||
info!("Writing engine api call latency output to file: {:?}", output_path);
|
||||
let mut writer = Writer::from_path(&output_path)?;
|
||||
|
||||
267
bin/reth-bench/src/bench/persistence_waiter.rs
Normal file
267
bin/reth-bench/src/bench/persistence_waiter.rs
Normal file
@@ -0,0 +1,267 @@
|
||||
//! Persistence waiting utilities for benchmarks.
|
||||
//!
|
||||
//! Provides waiting behavior to control benchmark pacing:
|
||||
//! - **Fixed duration waits**: Sleep for a fixed time between blocks
|
||||
//! - **Persistence-based waits**: Wait for blocks to be persisted using
|
||||
//! `reth_subscribePersistedBlock` subscription
|
||||
|
||||
use alloy_eips::BlockNumHash;
|
||||
use alloy_network::Ethereum;
|
||||
use alloy_provider::{Provider, RootProvider};
|
||||
use alloy_pubsub::SubscriptionStream;
|
||||
use alloy_rpc_client::RpcClient;
|
||||
use alloy_transport_ws::WsConnect;
|
||||
use eyre::Context;
|
||||
use futures::StreamExt;
|
||||
use std::time::Duration;
|
||||
use tracing::{debug, info};
|
||||
use url::Url;
|
||||
|
||||
/// Default timeout for waiting on persistence.
|
||||
pub(crate) const PERSISTENCE_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
/// Converts an engine API URL to the default RPC websocket URL.
|
||||
///
|
||||
/// Transformations:
|
||||
/// - `http` → `ws`
|
||||
/// - `https` → `wss`
|
||||
/// - `ws` / `wss` keep their scheme
|
||||
/// - Port is always set to `8546`, reth's default RPC websocket port.
|
||||
///
|
||||
/// This is used when we only know the engine API URL (typically `:8551`) but
|
||||
/// need to connect to the node's WS RPC endpoint for persistence events.
|
||||
pub(crate) fn engine_url_to_ws_url(engine_url: &str) -> eyre::Result<Url> {
|
||||
let url: Url = engine_url
|
||||
.parse()
|
||||
.wrap_err_with(|| format!("Failed to parse engine RPC URL: {engine_url}"))?;
|
||||
|
||||
let mut ws_url = url.clone();
|
||||
|
||||
match ws_url.scheme() {
|
||||
"http" => ws_url
|
||||
.set_scheme("ws")
|
||||
.map_err(|_| eyre::eyre!("Failed to set WS scheme for URL: {url}"))?,
|
||||
"https" => ws_url
|
||||
.set_scheme("wss")
|
||||
.map_err(|_| eyre::eyre!("Failed to set WSS scheme for URL: {url}"))?,
|
||||
"ws" | "wss" => {}
|
||||
scheme => {
|
||||
return Err(eyre::eyre!(
|
||||
"Unsupported URL scheme '{scheme}' for URL: {url}. Expected http, https, ws, or wss."
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
ws_url.set_port(Some(8546)).map_err(|_| eyre::eyre!("Failed to set port for URL: {url}"))?;
|
||||
|
||||
Ok(ws_url)
|
||||
}
|
||||
|
||||
/// Waits until the persistence subscription reports that `target` has been persisted.
|
||||
///
|
||||
/// Consumes subscription events until `last_persisted >= target`, or returns an error if:
|
||||
/// - the subscription stream ends unexpectedly, or
|
||||
/// - `timeout` elapses before `target` is observed.
|
||||
async fn wait_for_persistence(
|
||||
stream: &mut SubscriptionStream<BlockNumHash>,
|
||||
target: u64,
|
||||
last_persisted: &mut u64,
|
||||
timeout: Duration,
|
||||
) -> eyre::Result<()> {
|
||||
tokio::time::timeout(timeout, async {
|
||||
while *last_persisted < target {
|
||||
match stream.next().await {
|
||||
Some(persisted) => {
|
||||
*last_persisted = persisted.number;
|
||||
debug!(
|
||||
target: "reth-bench",
|
||||
persisted_block = ?last_persisted,
|
||||
"Received persistence notification"
|
||||
);
|
||||
}
|
||||
None => {
|
||||
return Err(eyre::eyre!("Persistence subscription closed unexpectedly"));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.map_err(|_| {
|
||||
eyre::eyre!(
|
||||
"Persistence timeout: target block {} not persisted within {:?}. Last persisted: {}",
|
||||
target,
|
||||
timeout,
|
||||
last_persisted
|
||||
)
|
||||
})?
|
||||
}
|
||||
|
||||
/// Wrapper that keeps both the subscription stream and the underlying provider alive.
|
||||
/// The provider must be kept alive for the subscription to continue receiving events.
|
||||
pub(crate) struct PersistenceSubscription {
|
||||
_provider: RootProvider<Ethereum>,
|
||||
stream: SubscriptionStream<BlockNumHash>,
|
||||
}
|
||||
|
||||
impl PersistenceSubscription {
|
||||
const fn new(
|
||||
provider: RootProvider<Ethereum>,
|
||||
stream: SubscriptionStream<BlockNumHash>,
|
||||
) -> Self {
|
||||
Self { _provider: provider, stream }
|
||||
}
|
||||
|
||||
const fn stream_mut(&mut self) -> &mut SubscriptionStream<BlockNumHash> {
|
||||
&mut self.stream
|
||||
}
|
||||
}
|
||||
|
||||
/// Establishes a websocket connection and subscribes to `reth_subscribePersistedBlock`.
|
||||
pub(crate) async fn setup_persistence_subscription(
|
||||
ws_url: Url,
|
||||
) -> eyre::Result<PersistenceSubscription> {
|
||||
info!("Connecting to WebSocket at {} for persistence subscription", ws_url);
|
||||
|
||||
let ws_connect = WsConnect::new(ws_url.to_string());
|
||||
let client = RpcClient::connect_pubsub(ws_connect)
|
||||
.await
|
||||
.wrap_err("Failed to connect to WebSocket RPC endpoint")?;
|
||||
let provider: RootProvider<Ethereum> = RootProvider::new(client);
|
||||
|
||||
let subscription = provider
|
||||
.subscribe_to::<BlockNumHash>("reth_subscribePersistedBlock")
|
||||
.await
|
||||
.wrap_err("Failed to subscribe to persistence notifications")?;
|
||||
|
||||
info!("Subscribed to persistence notifications");
|
||||
|
||||
Ok(PersistenceSubscription::new(provider, subscription.into_stream()))
|
||||
}
|
||||
|
||||
/// Encapsulates the block waiting logic.
|
||||
///
|
||||
/// Provides a simple `on_block()` interface that handles both:
|
||||
/// - Fixed duration waits (when `wait_time` is set)
|
||||
/// - Persistence-based waits (when `subscription` is set)
|
||||
///
|
||||
/// For persistence mode, waits after every `(threshold + 1)` blocks.
|
||||
pub(crate) struct PersistenceWaiter {
|
||||
wait_time: Option<Duration>,
|
||||
subscription: Option<PersistenceSubscription>,
|
||||
blocks_sent: u64,
|
||||
last_persisted: u64,
|
||||
threshold: u64,
|
||||
timeout: Duration,
|
||||
}
|
||||
|
||||
impl PersistenceWaiter {
|
||||
pub(crate) const fn with_duration(wait_time: Duration) -> Self {
|
||||
Self {
|
||||
wait_time: Some(wait_time),
|
||||
subscription: None,
|
||||
blocks_sent: 0,
|
||||
last_persisted: 0,
|
||||
threshold: 0,
|
||||
timeout: Duration::ZERO,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) const fn with_subscription(
|
||||
subscription: PersistenceSubscription,
|
||||
threshold: u64,
|
||||
timeout: Duration,
|
||||
) -> Self {
|
||||
Self {
|
||||
wait_time: None,
|
||||
subscription: Some(subscription),
|
||||
blocks_sent: 0,
|
||||
last_persisted: 0,
|
||||
threshold,
|
||||
timeout,
|
||||
}
|
||||
}
|
||||
|
||||
/// Called once per block. Waits based on the configured mode.
|
||||
#[allow(clippy::manual_is_multiple_of)]
|
||||
pub(crate) async fn on_block(&mut self, block_number: u64) -> eyre::Result<()> {
|
||||
if let Some(wait_time) = self.wait_time {
|
||||
tokio::time::sleep(wait_time).await;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let Some(ref mut subscription) = self.subscription else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
self.blocks_sent += 1;
|
||||
|
||||
if self.blocks_sent % (self.threshold + 1) == 0 {
|
||||
debug!(
|
||||
target: "reth-bench",
|
||||
target_block = ?block_number,
|
||||
last_persisted = self.last_persisted,
|
||||
blocks_sent = self.blocks_sent,
|
||||
"Waiting for persistence"
|
||||
);
|
||||
|
||||
wait_for_persistence(
|
||||
subscription.stream_mut(),
|
||||
block_number,
|
||||
&mut self.last_persisted,
|
||||
self.timeout,
|
||||
)
|
||||
.await?;
|
||||
|
||||
debug!(
|
||||
target: "reth-bench",
|
||||
persisted = self.last_persisted,
|
||||
"Persistence caught up"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::time::Instant;
|
||||
|
||||
#[test]
|
||||
fn test_engine_url_to_ws_url() {
|
||||
// http -> ws, always uses port 8546
|
||||
let result = engine_url_to_ws_url("http://localhost:8551").unwrap();
|
||||
assert_eq!(result.as_str(), "ws://localhost:8546/");
|
||||
|
||||
// https -> wss
|
||||
let result = engine_url_to_ws_url("https://localhost:8551").unwrap();
|
||||
assert_eq!(result.as_str(), "wss://localhost:8546/");
|
||||
|
||||
// Custom engine port still maps to 8546
|
||||
let result = engine_url_to_ws_url("http://localhost:9551").unwrap();
|
||||
assert_eq!(result.port(), Some(8546));
|
||||
|
||||
// Already ws passthrough
|
||||
let result = engine_url_to_ws_url("ws://localhost:8546").unwrap();
|
||||
assert_eq!(result.scheme(), "ws");
|
||||
|
||||
// Invalid inputs
|
||||
assert!(engine_url_to_ws_url("ftp://localhost:8551").is_err());
|
||||
assert!(engine_url_to_ws_url("not a valid url").is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_waiter_with_duration() {
|
||||
let mut waiter = PersistenceWaiter::with_duration(Duration::from_millis(1));
|
||||
|
||||
let start = Instant::now();
|
||||
waiter.on_block(1).await.unwrap();
|
||||
waiter.on_block(2).await.unwrap();
|
||||
waiter.on_block(3).await.unwrap();
|
||||
|
||||
// Should have waited ~3ms total
|
||||
assert!(start.elapsed() >= Duration::from_millis(3));
|
||||
}
|
||||
}
|
||||
@@ -2,10 +2,27 @@
|
||||
//!
|
||||
//! This command reads `ExecutionPayloadEnvelopeV4` files from a directory and replays them
|
||||
//! in sequence using `newPayload` followed by `forkchoiceUpdated`.
|
||||
//!
|
||||
//! Supports configurable waiting behavior:
|
||||
//! - **`--wait-time`**: Fixed sleep interval between blocks.
|
||||
//! - **`--wait-for-persistence`**: Waits for every Nth block to be persisted using the
|
||||
//! `reth_subscribePersistedBlock` subscription, where N matches the engine's persistence
|
||||
//! threshold. This ensures the benchmark doesn't outpace persistence.
|
||||
//!
|
||||
//! Both options can be used together or independently.
|
||||
|
||||
use crate::{
|
||||
authenticated_transport::AuthenticatedTransportConnect,
|
||||
bench::output::GasRampPayloadFile,
|
||||
bench::{
|
||||
output::{
|
||||
write_benchmark_results, CombinedResult, GasRampPayloadFile, NewPayloadResult,
|
||||
TotalGasOutput, TotalGasRow,
|
||||
},
|
||||
persistence_waiter::{
|
||||
engine_url_to_ws_url, setup_persistence_subscription, PersistenceWaiter,
|
||||
PERSISTENCE_CHECKPOINT_TIMEOUT,
|
||||
},
|
||||
},
|
||||
valid_payload::{call_forkchoice_updated, call_new_payload},
|
||||
};
|
||||
use alloy_primitives::B256;
|
||||
@@ -14,11 +31,16 @@ use alloy_rpc_client::ClientBuilder;
|
||||
use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV4, ForkchoiceState, JwtSecret};
|
||||
use clap::Parser;
|
||||
use eyre::Context;
|
||||
use reqwest::Url;
|
||||
use humantime::parse_duration;
|
||||
use reth_cli_runner::CliContext;
|
||||
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
|
||||
use reth_node_api::EngineApiMessageVersion;
|
||||
use std::path::PathBuf;
|
||||
use std::{
|
||||
path::PathBuf,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tracing::{debug, info};
|
||||
use url::Url;
|
||||
|
||||
/// `reth bench replay-payloads` command
|
||||
///
|
||||
@@ -51,6 +73,42 @@ pub struct Command {
|
||||
/// These are replayed before the main payloads to warm up the gas limit.
|
||||
#[arg(long, value_name = "GAS_RAMP_DIR")]
|
||||
gas_ramp_dir: Option<PathBuf>,
|
||||
|
||||
/// Optional output directory for benchmark results (CSV files).
|
||||
#[arg(long, value_name = "OUTPUT")]
|
||||
output: Option<PathBuf>,
|
||||
|
||||
/// How long to wait after a forkchoice update before sending the next payload.
|
||||
#[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
|
||||
wait_time: Option<Duration>,
|
||||
|
||||
/// Wait for blocks to be persisted before sending the next batch.
|
||||
///
|
||||
/// When enabled, waits for every Nth block to be persisted using the
|
||||
/// `reth_subscribePersistedBlock` subscription. This ensures the benchmark
|
||||
/// doesn't outpace persistence.
|
||||
///
|
||||
/// The subscription uses the regular RPC websocket endpoint (no JWT required).
|
||||
#[arg(long, default_value = "false", verbatim_doc_comment)]
|
||||
wait_for_persistence: bool,
|
||||
|
||||
/// Engine persistence threshold used for deciding when to wait for persistence.
|
||||
///
|
||||
/// The benchmark waits after every `(threshold + 1)` blocks. By default this
|
||||
/// matches the engine's `DEFAULT_PERSISTENCE_THRESHOLD` (2), so waits occur
|
||||
/// at blocks 3, 6, 9, etc.
|
||||
#[arg(
|
||||
long = "persistence-threshold",
|
||||
value_name = "PERSISTENCE_THRESHOLD",
|
||||
default_value_t = DEFAULT_PERSISTENCE_THRESHOLD,
|
||||
verbatim_doc_comment
|
||||
)]
|
||||
persistence_threshold: u64,
|
||||
|
||||
/// Optional `WebSocket` RPC URL for persistence subscription.
|
||||
/// If not provided, derives from engine RPC URL by changing scheme to ws and port to 8546.
|
||||
#[arg(long, value_name = "WS_RPC_URL", verbatim_doc_comment)]
|
||||
ws_rpc_url: Option<String>,
|
||||
}
|
||||
|
||||
/// A loaded payload ready for execution.
|
||||
@@ -78,6 +136,33 @@ impl Command {
|
||||
pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
|
||||
info!(payload_dir = %self.payload_dir.display(), "Replaying payloads");
|
||||
|
||||
// Log mode configuration
|
||||
if let Some(duration) = self.wait_time {
|
||||
info!("Using wait-time mode with {}ms delay between blocks", duration.as_millis());
|
||||
}
|
||||
if self.wait_for_persistence {
|
||||
info!(
|
||||
"Persistence waiting enabled (waits after every {} blocks to match engine gap > {} behavior)",
|
||||
self.persistence_threshold + 1,
|
||||
self.persistence_threshold
|
||||
);
|
||||
}
|
||||
|
||||
// Set up waiter based on configured options (duration takes precedence)
|
||||
let mut waiter = match (self.wait_time, self.wait_for_persistence) {
|
||||
(Some(duration), _) => Some(PersistenceWaiter::with_duration(duration)),
|
||||
(None, true) => {
|
||||
let ws_url = self.derive_ws_rpc_url()?;
|
||||
let sub = setup_persistence_subscription(ws_url).await?;
|
||||
Some(PersistenceWaiter::with_subscription(
|
||||
sub,
|
||||
self.persistence_threshold,
|
||||
PERSISTENCE_CHECKPOINT_TIMEOUT,
|
||||
))
|
||||
}
|
||||
(None, false) => None,
|
||||
};
|
||||
|
||||
// Set up authenticated engine provider
|
||||
let jwt =
|
||||
std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?;
|
||||
@@ -144,6 +229,11 @@ impl Command {
|
||||
call_forkchoice_updated(&auth_provider, payload.version, fcu_state, None).await?;
|
||||
|
||||
info!(gas_ramp_payload = i + 1, "Gas ramp payload executed successfully");
|
||||
|
||||
if let Some(w) = &mut waiter {
|
||||
w.on_block(payload.block_number).await?;
|
||||
}
|
||||
|
||||
parent_hash = payload.file.block_hash;
|
||||
}
|
||||
|
||||
@@ -151,22 +241,112 @@ impl Command {
|
||||
info!(count = gas_ramp_payloads.len(), "All gas ramp payloads replayed");
|
||||
}
|
||||
|
||||
let mut results = Vec::new();
|
||||
let total_benchmark_duration = Instant::now();
|
||||
|
||||
for (i, payload) in payloads.iter().enumerate() {
|
||||
info!(
|
||||
let envelope = &payload.envelope;
|
||||
let block_hash = payload.block_hash;
|
||||
let execution_payload = &envelope.envelope_inner.execution_payload;
|
||||
let inner_payload = &execution_payload.payload_inner.payload_inner;
|
||||
|
||||
let gas_used = inner_payload.gas_used;
|
||||
let gas_limit = inner_payload.gas_limit;
|
||||
let block_number = inner_payload.block_number;
|
||||
let transaction_count =
|
||||
execution_payload.payload_inner.payload_inner.transactions.len() as u64;
|
||||
|
||||
debug!(
|
||||
payload = i + 1,
|
||||
total = payloads.len(),
|
||||
index = payload.index,
|
||||
block_hash = %payload.block_hash,
|
||||
block_hash = %block_hash,
|
||||
"Executing payload (newPayload + FCU)"
|
||||
);
|
||||
|
||||
self.execute_payload_v4(&auth_provider, &payload.envelope, parent_hash).await?;
|
||||
let start = Instant::now();
|
||||
|
||||
info!(payload = i + 1, "Payload executed successfully");
|
||||
parent_hash = payload.block_hash;
|
||||
debug!(
|
||||
method = "engine_newPayloadV4",
|
||||
block_hash = %block_hash,
|
||||
"Sending newPayload"
|
||||
);
|
||||
|
||||
let status = auth_provider
|
||||
.new_payload_v4(
|
||||
execution_payload.clone(),
|
||||
vec![],
|
||||
B256::ZERO,
|
||||
envelope.execution_requests.to_vec(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
|
||||
|
||||
if !status.is_valid() {
|
||||
return Err(eyre::eyre!("Payload rejected: {:?}", status));
|
||||
}
|
||||
|
||||
let fcu_state = ForkchoiceState {
|
||||
head_block_hash: block_hash,
|
||||
safe_block_hash: parent_hash,
|
||||
finalized_block_hash: parent_hash,
|
||||
};
|
||||
|
||||
debug!(method = "engine_forkchoiceUpdatedV3", ?fcu_state, "Sending forkchoiceUpdated");
|
||||
|
||||
let fcu_result = auth_provider.fork_choice_updated_v3(fcu_state, None).await?;
|
||||
|
||||
let total_latency = start.elapsed();
|
||||
let fcu_latency = total_latency - new_payload_result.latency;
|
||||
|
||||
let combined_result = CombinedResult {
|
||||
block_number,
|
||||
gas_limit,
|
||||
transaction_count,
|
||||
new_payload_result,
|
||||
fcu_latency,
|
||||
total_latency,
|
||||
};
|
||||
|
||||
let current_duration = total_benchmark_duration.elapsed();
|
||||
info!(%combined_result);
|
||||
|
||||
if let Some(w) = &mut waiter {
|
||||
w.on_block(block_number).await?;
|
||||
}
|
||||
|
||||
let gas_row =
|
||||
TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
|
||||
results.push((gas_row, combined_result));
|
||||
|
||||
debug!(?status, ?fcu_result, "Payload executed successfully");
|
||||
parent_hash = block_hash;
|
||||
}
|
||||
|
||||
info!(count = payloads.len(), "All payloads replayed successfully");
|
||||
// Drop waiter - we don't need to wait for final blocks to persist
|
||||
// since the benchmark goal is measuring Ggas/s of newPayload/FCU, not persistence.
|
||||
drop(waiter);
|
||||
|
||||
let (gas_output_results, combined_results): (Vec<TotalGasRow>, Vec<CombinedResult>) =
|
||||
results.into_iter().unzip();
|
||||
|
||||
if let Some(ref path) = self.output {
|
||||
write_benchmark_results(path, &gas_output_results, &combined_results)?;
|
||||
}
|
||||
|
||||
let gas_output =
|
||||
TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;
|
||||
info!(
|
||||
total_gas_used = gas_output.total_gas_used,
|
||||
total_duration = ?gas_output.total_duration,
|
||||
execution_duration = ?gas_output.execution_duration,
|
||||
blocks_processed = gas_output.blocks_processed,
|
||||
wall_clock_ggas_per_second = format_args!("{:.4}", gas_output.total_gigagas_per_second()),
|
||||
execution_ggas_per_second = format_args!("{:.4}", gas_output.execution_gigagas_per_second()),
|
||||
"Benchmark complete"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -285,48 +465,32 @@ impl Command {
|
||||
Ok(payloads)
|
||||
}
|
||||
|
||||
async fn execute_payload_v4(
|
||||
&self,
|
||||
provider: &RootProvider<AnyNetwork>,
|
||||
envelope: &ExecutionPayloadEnvelopeV4,
|
||||
parent_hash: B256,
|
||||
) -> eyre::Result<()> {
|
||||
let block_hash =
|
||||
envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
|
||||
|
||||
debug!(
|
||||
method = "engine_newPayloadV4",
|
||||
block_hash = %block_hash,
|
||||
"Sending newPayload"
|
||||
);
|
||||
|
||||
let status = provider
|
||||
.new_payload_v4(
|
||||
envelope.envelope_inner.execution_payload.clone(),
|
||||
vec![],
|
||||
B256::ZERO,
|
||||
envelope.execution_requests.to_vec(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
info!(?status, "newPayloadV4 response");
|
||||
|
||||
if !status.is_valid() {
|
||||
return Err(eyre::eyre!("Payload rejected: {:?}", status));
|
||||
/// Returns the websocket RPC URL used for the persistence subscription.
|
||||
///
|
||||
/// Preference:
|
||||
/// - If `--ws-rpc-url` is provided, use it directly.
|
||||
/// - Otherwise, derive a WS RPC URL from `--engine-rpc-url`.
|
||||
///
|
||||
/// The persistence subscription endpoint (`reth_subscribePersistedBlock`) is exposed on
|
||||
/// the regular RPC server (WS port, usually 8546), not on the engine API port (usually 8551).
|
||||
/// Since we only have the engine URL by default, we convert the scheme
|
||||
/// (http→ws, https→wss) and force the port to 8546.
|
||||
fn derive_ws_rpc_url(&self) -> eyre::Result<Url> {
|
||||
if let Some(ref ws_url) = self.ws_rpc_url {
|
||||
let parsed: Url = ws_url
|
||||
.parse()
|
||||
.wrap_err_with(|| format!("Failed to parse WebSocket RPC URL: {ws_url}"))?;
|
||||
info!(target: "reth-bench", ws_url = %parsed, "Using provided WebSocket RPC URL");
|
||||
Ok(parsed)
|
||||
} else {
|
||||
let derived = engine_url_to_ws_url(&self.engine_rpc_url)?;
|
||||
debug!(
|
||||
target: "reth-bench",
|
||||
engine_url = %self.engine_rpc_url,
|
||||
%derived,
|
||||
"Derived WebSocket RPC URL from engine RPC URL"
|
||||
);
|
||||
Ok(derived)
|
||||
}
|
||||
|
||||
let fcu_state = ForkchoiceState {
|
||||
head_block_hash: block_hash,
|
||||
safe_block_hash: parent_hash,
|
||||
finalized_block_hash: parent_hash,
|
||||
};
|
||||
|
||||
debug!(method = "engine_forkchoiceUpdatedV3", ?fcu_state, "Sending forkchoiceUpdated");
|
||||
|
||||
let fcu_result = provider.fork_choice_updated_v3(fcu_state, None).await?;
|
||||
|
||||
info!(?fcu_result, "forkchoiceUpdatedV3 response");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user