diff --git a/bin/reth-bench/src/bench/new_payload_fcu.rs b/bin/reth-bench/src/bench/new_payload_fcu.rs index 52d39c77d7..821bb7343c 100644 --- a/bin/reth-bench/src/bench/new_payload_fcu.rs +++ b/bin/reth-bench/src/bench/new_payload_fcu.rs @@ -29,6 +29,7 @@ use alloy_provider::{ext::DebugApi, Provider}; use alloy_rpc_types_engine::ForkchoiceState; use clap::Parser; use eyre::{Context, OptionExt}; +use futures::{stream, StreamExt, TryStreamExt}; use reth_cli_runner::CliContext; use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD; use reth_node_core::args::BenchmarkArgs; @@ -151,7 +152,7 @@ impl Command { benchmark_mode, block_provider, auth_provider, - mut next_block, + next_block, is_optimism, use_reth_namespace, rlp_blocks, @@ -167,70 +168,71 @@ impl Command { let buffer_size = self.rpc_block_buffer_size; - // Use a oneshot channel to propagate errors from the spawned task - let (error_sender, mut error_receiver) = tokio::sync::oneshot::channel(); - let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer_size); + let mut blocks = Box::pin( + stream::iter((next_block..) + .take_while(|next_block| { + benchmark_mode.contains(*next_block) + })) + .map(|next_block| { + let block_provider = block_provider.clone(); + async move { + let block_res = block_provider + .get_block_by_number(next_block.into()) + .full() + .await + .wrap_err_with(|| { + format!("Failed to fetch block by number {next_block}") + }); + let block = + match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) { + Ok(block) => block, + Err(e) => { + tracing::error!(target: "reth-bench", "Failed to fetch block {next_block}: {e}"); + return Err(e) + } + }; - tokio::task::spawn(async move { - while benchmark_mode.contains(next_block) { - let block_res = block_provider - .get_block_by_number(next_block.into()) - .full() - .await - .wrap_err_with(|| format!("Failed to fetch block by number {next_block}")); - let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) { - Ok(block) => block, - Err(e) => { - tracing::error!(target: "reth-bench", "Failed to fetch block {next_block}: {e}"); - let _ = error_sender.send(e); - break; + let rlp = if rlp_blocks { + let rlp = match block_provider + .debug_get_raw_block(next_block.into()) + .await + { + Ok(rlp) => rlp, + Err(e) => { + tracing::error!(target: "reth-bench", "Failed to fetch raw block {next_block}: {e}"); + return Err(e.into()) + } + }; + Some(rlp) + } else { + None + }; + + let head_block_hash = block.header.hash; + let safe_block_hash = block_provider + .get_block_by_number(block.header.number.saturating_sub(32).into()); + + let finalized_block_hash = block_provider + .get_block_by_number(block.header.number.saturating_sub(64).into()); + + let (safe, finalized) = + tokio::join!(safe_block_hash, finalized_block_hash); + + let safe_block_hash = match safe { + Ok(Some(block)) => block.header.hash, + Ok(None) | Err(_) => head_block_hash, + }; + + let finalized_block_hash = match finalized { + Ok(Some(block)) => block.header.hash, + Ok(None) | Err(_) => head_block_hash, + }; + + Ok((block, head_block_hash, safe_block_hash, finalized_block_hash, rlp)) } - }; - - let rlp = if rlp_blocks { - let rlp = match block_provider.debug_get_raw_block(next_block.into()).await { - Ok(rlp) => rlp, - Err(e) => { - tracing::error!(target: "reth-bench", "Failed to fetch raw block {next_block}: {e}"); - let _ = error_sender - .send(eyre::eyre!("Failed to fetch raw block {next_block}: {e}")); - break; - } - }; - Some(rlp) - } else { - None - }; - - let head_block_hash = block.header.hash; - let safe_block_hash = block_provider - .get_block_by_number(block.header.number.saturating_sub(32).into()); - - let finalized_block_hash = block_provider - .get_block_by_number(block.header.number.saturating_sub(64).into()); - - let (safe, finalized) = tokio::join!(safe_block_hash, finalized_block_hash,); - - let safe_block_hash = match safe { - Ok(Some(block)) => block.header.hash, - Ok(None) | Err(_) => head_block_hash, - }; - - let finalized_block_hash = match finalized { - Ok(Some(block)) => block.header.hash, - Ok(None) | Err(_) => head_block_hash, - }; - - next_block += 1; - if let Err(e) = sender - .send((block, head_block_hash, safe_block_hash, finalized_block_hash, rlp)) - .await - { - tracing::error!(target: "reth-bench", "Failed to send block data: {e}"); - break; - } - } - }); + }) + .buffered(buffer_size), + ); let mut results = Vec::new(); let mut blocks_processed = 0u64; @@ -239,7 +241,7 @@ impl Command { while let Some((block, head, safe, finalized, rlp)) = { let wait_start = Instant::now(); - let result = receiver.recv().await; + let result = blocks.try_next().await?; total_wait_time += wait_start.elapsed(); result } { @@ -324,11 +326,6 @@ impl Command { results.push((gas_row, combined_result)); } - // Check if the spawned task encountered an error - if let Ok(error) = error_receiver.try_recv() { - return Err(error); - } - // Drop waiter - we don't need to wait for final blocks to persist // since the benchmark goal is measuring Ggas/s of newPayload/FCU, not persistence. drop(waiter);