mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
perf(reth-bench): fetch RPC blocks in parallel (#23117)
This commit is contained in:
@@ -29,6 +29,7 @@ use alloy_provider::{ext::DebugApi, Provider};
|
||||
use alloy_rpc_types_engine::ForkchoiceState;
|
||||
use clap::Parser;
|
||||
use eyre::{Context, OptionExt};
|
||||
use futures::{stream, StreamExt, TryStreamExt};
|
||||
use reth_cli_runner::CliContext;
|
||||
use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
|
||||
use reth_node_core::args::BenchmarkArgs;
|
||||
@@ -151,7 +152,7 @@ impl Command {
|
||||
benchmark_mode,
|
||||
block_provider,
|
||||
auth_provider,
|
||||
mut next_block,
|
||||
next_block,
|
||||
is_optimism,
|
||||
use_reth_namespace,
|
||||
rlp_blocks,
|
||||
@@ -167,70 +168,71 @@ impl Command {
|
||||
|
||||
let buffer_size = self.rpc_block_buffer_size;
|
||||
|
||||
// Use a oneshot channel to propagate errors from the spawned task
|
||||
let (error_sender, mut error_receiver) = tokio::sync::oneshot::channel();
|
||||
let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer_size);
|
||||
let mut blocks = Box::pin(
|
||||
stream::iter((next_block..)
|
||||
.take_while(|next_block| {
|
||||
benchmark_mode.contains(*next_block)
|
||||
}))
|
||||
.map(|next_block| {
|
||||
let block_provider = block_provider.clone();
|
||||
async move {
|
||||
let block_res = block_provider
|
||||
.get_block_by_number(next_block.into())
|
||||
.full()
|
||||
.await
|
||||
.wrap_err_with(|| {
|
||||
format!("Failed to fetch block by number {next_block}")
|
||||
});
|
||||
let block =
|
||||
match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) {
|
||||
Ok(block) => block,
|
||||
Err(e) => {
|
||||
tracing::error!(target: "reth-bench", "Failed to fetch block {next_block}: {e}");
|
||||
return Err(e)
|
||||
}
|
||||
};
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
while benchmark_mode.contains(next_block) {
|
||||
let block_res = block_provider
|
||||
.get_block_by_number(next_block.into())
|
||||
.full()
|
||||
.await
|
||||
.wrap_err_with(|| format!("Failed to fetch block by number {next_block}"));
|
||||
let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) {
|
||||
Ok(block) => block,
|
||||
Err(e) => {
|
||||
tracing::error!(target: "reth-bench", "Failed to fetch block {next_block}: {e}");
|
||||
let _ = error_sender.send(e);
|
||||
break;
|
||||
let rlp = if rlp_blocks {
|
||||
let rlp = match block_provider
|
||||
.debug_get_raw_block(next_block.into())
|
||||
.await
|
||||
{
|
||||
Ok(rlp) => rlp,
|
||||
Err(e) => {
|
||||
tracing::error!(target: "reth-bench", "Failed to fetch raw block {next_block}: {e}");
|
||||
return Err(e.into())
|
||||
}
|
||||
};
|
||||
Some(rlp)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let head_block_hash = block.header.hash;
|
||||
let safe_block_hash = block_provider
|
||||
.get_block_by_number(block.header.number.saturating_sub(32).into());
|
||||
|
||||
let finalized_block_hash = block_provider
|
||||
.get_block_by_number(block.header.number.saturating_sub(64).into());
|
||||
|
||||
let (safe, finalized) =
|
||||
tokio::join!(safe_block_hash, finalized_block_hash);
|
||||
|
||||
let safe_block_hash = match safe {
|
||||
Ok(Some(block)) => block.header.hash,
|
||||
Ok(None) | Err(_) => head_block_hash,
|
||||
};
|
||||
|
||||
let finalized_block_hash = match finalized {
|
||||
Ok(Some(block)) => block.header.hash,
|
||||
Ok(None) | Err(_) => head_block_hash,
|
||||
};
|
||||
|
||||
Ok((block, head_block_hash, safe_block_hash, finalized_block_hash, rlp))
|
||||
}
|
||||
};
|
||||
|
||||
let rlp = if rlp_blocks {
|
||||
let rlp = match block_provider.debug_get_raw_block(next_block.into()).await {
|
||||
Ok(rlp) => rlp,
|
||||
Err(e) => {
|
||||
tracing::error!(target: "reth-bench", "Failed to fetch raw block {next_block}: {e}");
|
||||
let _ = error_sender
|
||||
.send(eyre::eyre!("Failed to fetch raw block {next_block}: {e}"));
|
||||
break;
|
||||
}
|
||||
};
|
||||
Some(rlp)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let head_block_hash = block.header.hash;
|
||||
let safe_block_hash = block_provider
|
||||
.get_block_by_number(block.header.number.saturating_sub(32).into());
|
||||
|
||||
let finalized_block_hash = block_provider
|
||||
.get_block_by_number(block.header.number.saturating_sub(64).into());
|
||||
|
||||
let (safe, finalized) = tokio::join!(safe_block_hash, finalized_block_hash,);
|
||||
|
||||
let safe_block_hash = match safe {
|
||||
Ok(Some(block)) => block.header.hash,
|
||||
Ok(None) | Err(_) => head_block_hash,
|
||||
};
|
||||
|
||||
let finalized_block_hash = match finalized {
|
||||
Ok(Some(block)) => block.header.hash,
|
||||
Ok(None) | Err(_) => head_block_hash,
|
||||
};
|
||||
|
||||
next_block += 1;
|
||||
if let Err(e) = sender
|
||||
.send((block, head_block_hash, safe_block_hash, finalized_block_hash, rlp))
|
||||
.await
|
||||
{
|
||||
tracing::error!(target: "reth-bench", "Failed to send block data: {e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
})
|
||||
.buffered(buffer_size),
|
||||
);
|
||||
|
||||
let mut results = Vec::new();
|
||||
let mut blocks_processed = 0u64;
|
||||
@@ -239,7 +241,7 @@ impl Command {
|
||||
|
||||
while let Some((block, head, safe, finalized, rlp)) = {
|
||||
let wait_start = Instant::now();
|
||||
let result = receiver.recv().await;
|
||||
let result = blocks.try_next().await?;
|
||||
total_wait_time += wait_start.elapsed();
|
||||
result
|
||||
} {
|
||||
@@ -324,11 +326,6 @@ impl Command {
|
||||
results.push((gas_row, combined_result));
|
||||
}
|
||||
|
||||
// Check if the spawned task encountered an error
|
||||
if let Ok(error) = error_receiver.try_recv() {
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
// Drop waiter - we don't need to wait for final blocks to persist
|
||||
// since the benchmark goal is measuring Ggas/s of newPayload/FCU, not persistence.
|
||||
drop(waiter);
|
||||
|
||||
Reference in New Issue
Block a user