mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
chore(reth-bench): use "reth-bench" log target (#21870)
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
committed by
GitHub
parent
cd816ce211
commit
df8f411f50
@@ -35,7 +35,7 @@ impl BenchContext {
|
||||
/// This is the initialization code for most benchmarks, taking in a [`BenchmarkArgs`] and
|
||||
/// returning the providers needed to run a benchmark.
|
||||
pub(crate) async fn new(bench_args: &BenchmarkArgs, rpc_url: String) -> eyre::Result<Self> {
|
||||
info!("Running benchmark using data from RPC URL: {}", rpc_url);
|
||||
info!(target: "reth-bench", "Running benchmark using data from RPC URL: {}", rpc_url);
|
||||
|
||||
// Ensure that output directory exists and is a directory
|
||||
if let Some(output) = &bench_args.output {
|
||||
@@ -45,7 +45,7 @@ impl BenchContext {
|
||||
// Create the directory if it doesn't exist
|
||||
if !output.exists() {
|
||||
std::fs::create_dir_all(output)?;
|
||||
info!("Created output directory: {:?}", output);
|
||||
info!(target: "reth-bench", "Created output directory: {:?}", output);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -77,7 +77,7 @@ impl BenchContext {
|
||||
let auth_url = Url::parse(&bench_args.engine_rpc_url)?;
|
||||
|
||||
// construct the authed transport
|
||||
info!("Connecting to Engine RPC at {} for replay", auth_url);
|
||||
info!(target: "reth-bench", "Connecting to Engine RPC at {} for replay", auth_url);
|
||||
let auth_transport = AuthenticatedTransportConnect::new(auth_url, jwt);
|
||||
let client = ClientBuilder::default().connect_with(auth_transport).await?;
|
||||
let auth_provider = RootProvider::<AnyNetwork>::new(client);
|
||||
|
||||
@@ -87,7 +87,7 @@ impl Command {
|
||||
}
|
||||
if !self.output.exists() {
|
||||
std::fs::create_dir_all(&self.output)?;
|
||||
info!("Created output directory: {:?}", self.output);
|
||||
info!(target: "reth-bench", "Created output directory: {:?}", self.output);
|
||||
}
|
||||
|
||||
// Set up authenticated provider (used for both Engine API and eth_ methods)
|
||||
@@ -95,7 +95,7 @@ impl Command {
|
||||
let jwt = JwtSecret::from_hex(jwt)?;
|
||||
let auth_url = Url::parse(&self.engine_rpc_url)?;
|
||||
|
||||
info!("Connecting to Engine RPC at {}", auth_url);
|
||||
info!(target: "reth-bench", "Connecting to Engine RPC at {}", auth_url);
|
||||
let auth_transport = AuthenticatedTransportConnect::new(auth_url, jwt);
|
||||
let client = ClientBuilder::default().connect_with(auth_transport).await?;
|
||||
let provider = RootProvider::<AnyNetwork>::new(client);
|
||||
@@ -120,6 +120,7 @@ impl Command {
|
||||
match mode {
|
||||
RampMode::Blocks(blocks) => {
|
||||
info!(
|
||||
target: "reth-bench",
|
||||
canonical_parent,
|
||||
start_block,
|
||||
end_block = start_block + blocks - 1,
|
||||
@@ -128,6 +129,7 @@ impl Command {
|
||||
}
|
||||
RampMode::TargetGasLimit(target) => {
|
||||
info!(
|
||||
target: "reth-bench",
|
||||
canonical_parent,
|
||||
start_block,
|
||||
current_gas_limit = parent_header.gas_limit,
|
||||
@@ -176,7 +178,7 @@ impl Command {
|
||||
GasRampPayloadFile { version: version as u8, block_hash, params: params.clone() };
|
||||
let payload_json = serde_json::to_string_pretty(&file)?;
|
||||
std::fs::write(&payload_path, &payload_json)?;
|
||||
info!(block_number = block.header.number, path = %payload_path.display(), "Saved payload");
|
||||
info!(target: "reth-bench", block_number = block.header.number, path = %payload_path.display(), "Saved payload");
|
||||
|
||||
call_new_payload(&provider, version, params).await?;
|
||||
|
||||
@@ -194,6 +196,7 @@ impl Command {
|
||||
|
||||
let final_gas_limit = parent_header.gas_limit;
|
||||
info!(
|
||||
target: "reth-bench",
|
||||
total_duration=?total_benchmark_duration.elapsed(),
|
||||
blocks_processed,
|
||||
final_gas_limit,
|
||||
|
||||
@@ -152,7 +152,7 @@ impl<S: TransactionSource> TransactionCollector<S> {
|
||||
while total_gas < gas_target {
|
||||
let Some((block_txs, _)) = self.source.fetch_block_transactions(current_block).await?
|
||||
else {
|
||||
warn!(block = current_block, "Block not found, stopping");
|
||||
warn!(target: "reth-bench", block = current_block, "Block not found, stopping");
|
||||
break;
|
||||
};
|
||||
|
||||
@@ -182,6 +182,7 @@ impl<S: TransactionSource> TransactionCollector<S> {
|
||||
}
|
||||
|
||||
info!(
|
||||
target: "reth-bench",
|
||||
total_txs = transactions.len(),
|
||||
gas_sent = total_gas,
|
||||
next_block = current_block,
|
||||
@@ -299,10 +300,10 @@ async fn fetch_batch_with_retry<S: TransactionSource>(
|
||||
Ok(result) => return Some(result),
|
||||
Err(e) => {
|
||||
if attempt == MAX_FETCH_RETRIES {
|
||||
warn!(attempt, error = %e, "Failed to fetch transactions after max retries");
|
||||
warn!(target: "reth-bench", attempt, error = %e, "Failed to fetch transactions after max retries");
|
||||
return None;
|
||||
}
|
||||
warn!(attempt, error = %e, "Failed to fetch transactions, retrying...");
|
||||
warn!(target: "reth-bench", attempt, error = %e, "Failed to fetch transactions, retrying...");
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
@@ -342,7 +343,7 @@ impl TxBuffer {
|
||||
impl Command {
|
||||
/// Execute the `generate-big-block` command
|
||||
pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
|
||||
info!(target_gas = self.target_gas, count = self.count, "Generating big block(s)");
|
||||
info!(target: "reth-bench", target_gas = self.target_gas, count = self.count, "Generating big block(s)");
|
||||
|
||||
// Set up authenticated engine provider
|
||||
let jwt =
|
||||
@@ -350,20 +351,20 @@ impl Command {
|
||||
let jwt = JwtSecret::from_hex(jwt.trim())?;
|
||||
let auth_url = Url::parse(&self.engine_rpc_url)?;
|
||||
|
||||
info!("Connecting to Engine RPC at {}", auth_url);
|
||||
info!(target: "reth-bench", "Connecting to Engine RPC at {}", auth_url);
|
||||
let auth_transport = AuthenticatedTransportConnect::new(auth_url.clone(), jwt);
|
||||
let auth_client = ClientBuilder::default().connect_with(auth_transport).await?;
|
||||
let auth_provider = RootProvider::<AnyNetwork>::new(auth_client);
|
||||
|
||||
// Set up testing RPC provider (for testing_buildBlockV1)
|
||||
info!("Connecting to Testing RPC at {}", self.testing_rpc_url);
|
||||
info!(target: "reth-bench", "Connecting to Testing RPC at {}", self.testing_rpc_url);
|
||||
let testing_client = ClientBuilder::default()
|
||||
.layer(RetryBackoffLayer::new(10, 800, u64::MAX))
|
||||
.http(self.testing_rpc_url.parse()?);
|
||||
let testing_provider = RootProvider::<AnyNetwork>::new(testing_client);
|
||||
|
||||
// Get the parent block (latest canonical block)
|
||||
info!(endpoint = "engine", method = "eth_getBlockByNumber", block = "latest", "RPC call");
|
||||
info!(target: "reth-bench", endpoint = "engine", method = "eth_getBlockByNumber", block = "latest", "RPC call");
|
||||
let parent_block = auth_provider
|
||||
.get_block_by_number(BlockNumberOrTag::Latest)
|
||||
.await?
|
||||
@@ -374,6 +375,7 @@ impl Command {
|
||||
let parent_timestamp = parent_block.header.timestamp;
|
||||
|
||||
info!(
|
||||
target: "reth-bench",
|
||||
parent_hash = %parent_hash,
|
||||
parent_number = parent_number,
|
||||
"Using initial parent block"
|
||||
@@ -417,7 +419,7 @@ impl Command {
|
||||
.await?;
|
||||
}
|
||||
|
||||
info!(count = self.count, output_dir = %self.output_dir.display(), "All payloads generated");
|
||||
info!(target: "reth-bench", count = self.count, output_dir = %self.output_dir.display(), "All payloads generated");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -448,9 +450,9 @@ impl Command {
|
||||
self.save_payload(&built)?;
|
||||
|
||||
if self.execute || self.count > 1 {
|
||||
info!(payload = i + 1, block_hash = %built.block_hash, gas_used = built.gas_used, "Executing payload (newPayload + FCU)");
|
||||
info!(target: "reth-bench", payload = i + 1, block_hash = %built.block_hash, gas_used = built.gas_used, "Executing payload (newPayload + FCU)");
|
||||
self.execute_payload_v4(auth_provider, built.envelope, parent_hash).await?;
|
||||
info!(payload = i + 1, "Payload executed successfully");
|
||||
info!(target: "reth-bench", payload = i + 1, "Payload executed successfully");
|
||||
}
|
||||
|
||||
parent_hash = built.block_hash;
|
||||
@@ -477,6 +479,7 @@ impl Command {
|
||||
let gas_sent = result.gas_sent;
|
||||
|
||||
info!(
|
||||
target: "reth-bench",
|
||||
payload = index + 1,
|
||||
attempt,
|
||||
tx_count = tx_bytes.len(),
|
||||
@@ -506,7 +509,7 @@ impl Command {
|
||||
}
|
||||
}
|
||||
|
||||
warn!(payload = index + 1, "Retry loop exited without returning a payload");
|
||||
warn!(target: "reth-bench", payload = index + 1, "Retry loop exited without returning a payload");
|
||||
Err(eyre::eyre!("build_with_retry exhausted retries without result"))
|
||||
}
|
||||
|
||||
@@ -534,7 +537,7 @@ impl Command {
|
||||
let tx_source = match RpcTransactionSource::from_url(&rpc_url) {
|
||||
Ok(source) => source,
|
||||
Err(e) => {
|
||||
warn!(error = %e, "Failed to create transaction source");
|
||||
warn!(target: "reth-bench", error = %e, "Failed to create transaction source");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
@@ -544,11 +547,12 @@ impl Command {
|
||||
|
||||
while let Some(batch) = fetch_batch_with_retry(&collector, current_block).await {
|
||||
if batch.transactions.is_empty() {
|
||||
info!(block = current_block, "Reached chain tip, stopping fetcher");
|
||||
info!(target: "reth-bench", block = current_block, "Reached chain tip, stopping fetcher");
|
||||
break;
|
||||
}
|
||||
|
||||
info!(
|
||||
target: "reth-bench",
|
||||
tx_count = batch.transactions.len(),
|
||||
gas_sent = batch.gas_sent,
|
||||
blocks = format!("{}..{}", current_block, batch.next_block),
|
||||
@@ -574,6 +578,7 @@ impl Command {
|
||||
// Get initial batch of transactions for this payload
|
||||
let Some(mut result) = tx_buffer.take_batch().await else {
|
||||
info!(
|
||||
target: "reth-bench",
|
||||
payloads_built = i,
|
||||
payloads_requested = self.count,
|
||||
"Transaction source exhausted, stopping"
|
||||
@@ -583,6 +588,7 @@ impl Command {
|
||||
|
||||
if result.transactions.is_empty() {
|
||||
info!(
|
||||
target: "reth-bench",
|
||||
payloads_built = i,
|
||||
payloads_requested = self.count,
|
||||
"No more transactions available, stopping"
|
||||
@@ -608,9 +614,9 @@ impl Command {
|
||||
let current_timestamp = built.timestamp;
|
||||
|
||||
// Execute payload
|
||||
info!(payload = i + 1, block_hash = %current_block_hash, gas_used = built.gas_used, "Executing payload (newPayload + FCU)");
|
||||
info!(target: "reth-bench", payload = i + 1, block_hash = %current_block_hash, gas_used = built.gas_used, "Executing payload (newPayload + FCU)");
|
||||
self.execute_payload_v4(auth_provider, built.envelope, parent_hash).await?;
|
||||
info!(payload = i + 1, "Payload executed successfully");
|
||||
info!(target: "reth-bench", payload = i + 1, "Payload executed successfully");
|
||||
|
||||
parent_hash = current_block_hash;
|
||||
parent_timestamp = current_timestamp;
|
||||
@@ -638,6 +644,7 @@ impl Command {
|
||||
let gas_sent = result.gas_sent;
|
||||
|
||||
info!(
|
||||
target: "reth-bench",
|
||||
payload = index + 1,
|
||||
attempt,
|
||||
tx_count = tx_bytes.len(),
|
||||
@@ -666,7 +673,7 @@ impl Command {
|
||||
result.gas_sent = result.gas_sent.saturating_add(batch.gas_sent);
|
||||
result.next_block = batch.next_block;
|
||||
} else {
|
||||
warn!("Transaction fetcher exhausted, proceeding with available transactions");
|
||||
warn!(target: "reth-bench", "Transaction fetcher exhausted, proceeding with available transactions");
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -674,7 +681,7 @@ impl Command {
|
||||
}
|
||||
}
|
||||
|
||||
warn!(payload = index + 1, "Retry loop exited without returning a payload");
|
||||
warn!(target: "reth-bench", payload = index + 1, "Retry loop exited without returning a payload");
|
||||
Err(eyre::eyre!("build_with_retry_buffered exhausted retries without result"))
|
||||
}
|
||||
|
||||
@@ -690,6 +697,7 @@ impl Command {
|
||||
|
||||
if gas_used + MIN_TARGET_SLACK >= self.target_gas {
|
||||
info!(
|
||||
target: "reth-bench",
|
||||
payload = index + 1,
|
||||
gas_used,
|
||||
target_gas = self.target_gas,
|
||||
@@ -701,6 +709,7 @@ impl Command {
|
||||
|
||||
if attempt == MAX_BUILD_RETRIES {
|
||||
warn!(
|
||||
target: "reth-bench",
|
||||
payload = index + 1,
|
||||
gas_used,
|
||||
target_gas = self.target_gas,
|
||||
@@ -712,6 +721,7 @@ impl Command {
|
||||
|
||||
if gas_used == 0 {
|
||||
warn!(
|
||||
target: "reth-bench",
|
||||
payload = index + 1,
|
||||
"Zero gas used in payload, requesting fixed chunk of additional transactions"
|
||||
);
|
||||
@@ -725,6 +735,7 @@ impl Command {
|
||||
|
||||
if additional == 0 {
|
||||
info!(
|
||||
target: "reth-bench",
|
||||
payload = index + 1,
|
||||
gas_used,
|
||||
target_gas = self.target_gas,
|
||||
@@ -735,6 +746,7 @@ impl Command {
|
||||
|
||||
let ratio = gas_used as f64 / gas_sent as f64;
|
||||
info!(
|
||||
target: "reth-bench",
|
||||
payload = index + 1,
|
||||
gas_used,
|
||||
gas_sent,
|
||||
@@ -768,6 +780,7 @@ impl Command {
|
||||
|
||||
let total_tx_bytes: usize = transactions.iter().map(|tx| tx.len()).sum();
|
||||
info!(
|
||||
target: "reth-bench",
|
||||
payload = index + 1,
|
||||
tx_count = transactions.len(),
|
||||
total_tx_bytes = total_tx_bytes,
|
||||
@@ -795,7 +808,7 @@ impl Command {
|
||||
let json = serde_json::to_string_pretty(&payload.envelope)?;
|
||||
std::fs::write(&filepath, &json)
|
||||
.wrap_err_with(|| format!("Failed to write payload to {:?}", filepath))?;
|
||||
info!(block_number = payload.block_number, block_hash = %payload.block_hash, path = %filepath.display(), "Payload saved");
|
||||
info!(target: "reth-bench", block_number = payload.block_number, block_hash = %payload.block_hash, path = %filepath.display(), "Payload saved");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -180,7 +180,7 @@ pub(crate) async fn get_payload_with_sidecar(
|
||||
payload_id: PayloadId,
|
||||
parent_beacon_block_root: Option<B256>,
|
||||
) -> eyre::Result<(ExecutionPayload, ExecutionPayloadSidecar)> {
|
||||
debug!(get_payload_version = ?version, ?payload_id, "Sending getPayload");
|
||||
debug!(target: "reth-bench", get_payload_version = ?version, ?payload_id, "Sending getPayload");
|
||||
|
||||
match version {
|
||||
1 => {
|
||||
|
||||
@@ -111,7 +111,18 @@ impl BenchmarkCommand {
|
||||
///
|
||||
/// If file logging is enabled, this function returns a guard that must be kept alive to ensure
|
||||
/// that all logs are flushed to disk.
|
||||
///
|
||||
/// Always enables log target display (`RUST_LOG_TARGET=1`) so that the `reth-bench` target
|
||||
/// is visible in output, making it easy to distinguish reth-bench logs from reth logs when
|
||||
/// both are streamed to the same console or file.
|
||||
pub fn init_tracing(&self) -> eyre::Result<Option<FileWorkerGuard>> {
|
||||
// Always show the log target so "reth-bench" is visible in the output.
|
||||
if std::env::var_os("RUST_LOG_TARGET").is_none() {
|
||||
// SAFETY: This is called early during single-threaded initialization, before any
|
||||
// threads are spawned and before the tracing subscriber is set up.
|
||||
unsafe { std::env::set_var("RUST_LOG_TARGET", "1") };
|
||||
}
|
||||
|
||||
let guard = self.logs.init_tracing()?;
|
||||
Ok(guard)
|
||||
}
|
||||
|
||||
@@ -86,10 +86,11 @@ impl Command {
|
||||
pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
|
||||
// Log mode configuration
|
||||
if let Some(duration) = self.wait_time {
|
||||
info!("Using wait-time mode with {}ms delay between blocks", duration.as_millis());
|
||||
info!(target: "reth-bench", "Using wait-time mode with {}ms delay between blocks", duration.as_millis());
|
||||
}
|
||||
if self.wait_for_persistence {
|
||||
info!(
|
||||
target: "reth-bench",
|
||||
"Persistence waiting enabled (waits after every {} blocks to match engine gap > {} behavior)",
|
||||
self.persistence_threshold + 1,
|
||||
self.persistence_threshold
|
||||
@@ -153,7 +154,7 @@ impl Command {
|
||||
let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) {
|
||||
Ok(block) => block,
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to fetch block {next_block}: {e}");
|
||||
tracing::error!(target: "reth-bench", "Failed to fetch block {next_block}: {e}");
|
||||
let _ = error_sender.send(e);
|
||||
break;
|
||||
}
|
||||
@@ -183,7 +184,7 @@ impl Command {
|
||||
.send((block, head_block_hash, safe_block_hash, finalized_block_hash))
|
||||
.await
|
||||
{
|
||||
tracing::error!("Failed to send block data: {e}");
|
||||
tracing::error!(target: "reth-bench", "Failed to send block data: {e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -234,7 +235,7 @@ impl Command {
|
||||
// Exclude time spent waiting on the block prefetch channel from the benchmark duration.
|
||||
// We want to measure engine throughput, not RPC fetch latency.
|
||||
let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
|
||||
info!(%combined_result);
|
||||
info!(target: "reth-bench", %combined_result);
|
||||
|
||||
if let Some(w) = &mut waiter {
|
||||
w.on_block(block_number).await?;
|
||||
@@ -265,6 +266,7 @@ impl Command {
|
||||
TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;
|
||||
|
||||
info!(
|
||||
target: "reth-bench",
|
||||
total_gas_used = gas_output.total_gas_used,
|
||||
total_duration = ?gas_output.total_duration,
|
||||
execution_duration = ?gas_output.execution_duration,
|
||||
|
||||
@@ -68,7 +68,7 @@ impl Command {
|
||||
let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) {
|
||||
Ok(block) => block,
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to fetch block {next_block}: {e}");
|
||||
tracing::error!(target: "reth-bench", "Failed to fetch block {next_block}: {e}");
|
||||
let _ = error_sender.send(e);
|
||||
break;
|
||||
}
|
||||
@@ -76,7 +76,7 @@ impl Command {
|
||||
|
||||
next_block += 1;
|
||||
if let Err(e) = sender.send(block).await {
|
||||
tracing::error!("Failed to send block data: {e}");
|
||||
tracing::error!(target: "reth-bench", "Failed to send block data: {e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -97,7 +97,7 @@ impl Command {
|
||||
let transaction_count = block.transactions.len() as u64;
|
||||
let gas_used = block.header.gas_used;
|
||||
|
||||
debug!(number=?block.header.number, "Sending payload to engine");
|
||||
debug!(target: "reth-bench", number=?block.header.number, "Sending payload to engine");
|
||||
|
||||
let (version, params) = block_to_new_payload(block, is_optimism)?;
|
||||
|
||||
@@ -105,7 +105,7 @@ impl Command {
|
||||
call_new_payload(&auth_provider, version, params).await?;
|
||||
|
||||
let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
|
||||
info!(%new_payload_result);
|
||||
info!(target: "reth-bench", %new_payload_result);
|
||||
|
||||
// current duration since the start of the benchmark minus the time
|
||||
// waiting for blocks
|
||||
@@ -129,7 +129,7 @@ impl Command {
|
||||
if let Some(path) = self.benchmark.output {
|
||||
// first write the new payload results to a file
|
||||
let output_path = path.join(NEW_PAYLOAD_OUTPUT_SUFFIX);
|
||||
info!("Writing newPayload call latency output to file: {:?}", output_path);
|
||||
info!(target: "reth-bench", "Writing newPayload call latency output to file: {:?}", output_path);
|
||||
let mut writer = Writer::from_path(output_path)?;
|
||||
for result in new_payload_results {
|
||||
writer.serialize(result)?;
|
||||
@@ -138,19 +138,20 @@ impl Command {
|
||||
|
||||
// now write the gas output to a file
|
||||
let output_path = path.join(GAS_OUTPUT_SUFFIX);
|
||||
info!("Writing total gas output to file: {:?}", output_path);
|
||||
info!(target: "reth-bench", "Writing total gas output to file: {:?}", output_path);
|
||||
let mut writer = Writer::from_path(output_path)?;
|
||||
for row in &gas_output_results {
|
||||
writer.serialize(row)?;
|
||||
}
|
||||
writer.flush()?;
|
||||
|
||||
info!("Finished writing benchmark output files to {:?}.", path);
|
||||
info!(target: "reth-bench", "Finished writing benchmark output files to {:?}.", path);
|
||||
}
|
||||
|
||||
// accumulate the results and calculate the overall Ggas/s
|
||||
let gas_output = TotalGasOutput::new(gas_output_results)?;
|
||||
info!(
|
||||
target: "reth-bench",
|
||||
total_duration=?gas_output.total_duration,
|
||||
total_gas_used=?gas_output.total_gas_used,
|
||||
blocks_processed=?gas_output.blocks_processed,
|
||||
|
||||
@@ -226,7 +226,7 @@ pub(crate) fn write_benchmark_results(
|
||||
fs::create_dir_all(output_dir)?;
|
||||
|
||||
let output_path = output_dir.join(COMBINED_OUTPUT_SUFFIX);
|
||||
info!("Writing engine api call latency output to file: {:?}", output_path);
|
||||
info!(target: "reth-bench", "Writing engine api call latency output to file: {:?}", output_path);
|
||||
let mut writer = Writer::from_path(&output_path)?;
|
||||
for result in combined_results {
|
||||
writer.serialize(result)?;
|
||||
@@ -234,14 +234,14 @@ pub(crate) fn write_benchmark_results(
|
||||
writer.flush()?;
|
||||
|
||||
let output_path = output_dir.join(GAS_OUTPUT_SUFFIX);
|
||||
info!("Writing total gas output to file: {:?}", output_path);
|
||||
info!(target: "reth-bench", "Writing total gas output to file: {:?}", output_path);
|
||||
let mut writer = Writer::from_path(&output_path)?;
|
||||
for row in gas_results {
|
||||
writer.serialize(row)?;
|
||||
}
|
||||
writer.flush()?;
|
||||
|
||||
info!("Finished writing benchmark output files to {:?}.", output_dir);
|
||||
info!(target: "reth-bench", "Finished writing benchmark output files to {:?}.", output_dir);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -160,7 +160,7 @@ impl PersistenceSubscription {
|
||||
pub(crate) async fn setup_persistence_subscription(
|
||||
ws_url: Url,
|
||||
) -> eyre::Result<PersistenceSubscription> {
|
||||
info!("Connecting to WebSocket at {} for persistence subscription", ws_url);
|
||||
info!(target: "reth-bench", "Connecting to WebSocket at {} for persistence subscription", ws_url);
|
||||
|
||||
let ws_connect = WsConnect::new(ws_url.to_string());
|
||||
let client = RpcClient::connect_pubsub(ws_connect)
|
||||
@@ -173,7 +173,7 @@ pub(crate) async fn setup_persistence_subscription(
|
||||
.await
|
||||
.wrap_err("Failed to subscribe to persistence notifications")?;
|
||||
|
||||
info!("Subscribed to persistence notifications");
|
||||
info!(target: "reth-bench", "Subscribed to persistence notifications");
|
||||
|
||||
Ok(PersistenceSubscription::new(provider, subscription.into_stream()))
|
||||
}
|
||||
|
||||
@@ -134,14 +134,15 @@ struct GasRampPayload {
|
||||
impl Command {
|
||||
/// Execute the `replay-payloads` command.
|
||||
pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
|
||||
info!(payload_dir = %self.payload_dir.display(), "Replaying payloads");
|
||||
info!(target: "reth-bench", payload_dir = %self.payload_dir.display(), "Replaying payloads");
|
||||
|
||||
// Log mode configuration
|
||||
if let Some(duration) = self.wait_time {
|
||||
info!("Using wait-time mode with {}ms delay between blocks", duration.as_millis());
|
||||
info!(target: "reth-bench", "Using wait-time mode with {}ms delay between blocks", duration.as_millis());
|
||||
}
|
||||
if self.wait_for_persistence {
|
||||
info!(
|
||||
target: "reth-bench",
|
||||
"Persistence waiting enabled (waits after every {} blocks to match engine gap > {} behavior)",
|
||||
self.persistence_threshold + 1,
|
||||
self.persistence_threshold
|
||||
@@ -180,7 +181,7 @@ impl Command {
|
||||
let jwt = JwtSecret::from_hex(jwt.trim())?;
|
||||
let auth_url = Url::parse(&self.engine_rpc_url)?;
|
||||
|
||||
info!("Connecting to Engine RPC at {}", auth_url);
|
||||
info!(target: "reth-bench", "Connecting to Engine RPC at {}", auth_url);
|
||||
let auth_transport = AuthenticatedTransportConnect::new(auth_url.clone(), jwt);
|
||||
let auth_client = ClientBuilder::default().connect_with(auth_transport).await?;
|
||||
let auth_provider = RootProvider::<AnyNetwork>::new(auth_client);
|
||||
@@ -195,6 +196,7 @@ impl Command {
|
||||
let initial_parent_number = parent_block.header.number;
|
||||
|
||||
info!(
|
||||
target: "reth-bench",
|
||||
parent_hash = %initial_parent_hash,
|
||||
parent_number = initial_parent_number,
|
||||
"Using initial parent block"
|
||||
@@ -206,7 +208,7 @@ impl Command {
|
||||
if payloads.is_empty() {
|
||||
return Err(eyre::eyre!("No gas ramp payload files found in {:?}", gas_ramp_dir));
|
||||
}
|
||||
info!(count = payloads.len(), "Loaded gas ramp payloads from disk");
|
||||
info!(target: "reth-bench", count = payloads.len(), "Loaded gas ramp payloads from disk");
|
||||
payloads
|
||||
} else {
|
||||
Vec::new()
|
||||
@@ -216,13 +218,14 @@ impl Command {
|
||||
if payloads.is_empty() {
|
||||
return Err(eyre::eyre!("No payload files found in {:?}", self.payload_dir));
|
||||
}
|
||||
info!(count = payloads.len(), "Loaded main payloads from disk");
|
||||
info!(target: "reth-bench", count = payloads.len(), "Loaded main payloads from disk");
|
||||
|
||||
let mut parent_hash = initial_parent_hash;
|
||||
|
||||
// Replay gas ramp payloads first
|
||||
for (i, payload) in gas_ramp_payloads.iter().enumerate() {
|
||||
info!(
|
||||
target: "reth-bench",
|
||||
gas_ramp_payload = i + 1,
|
||||
total = gas_ramp_payloads.len(),
|
||||
block_number = payload.block_number,
|
||||
@@ -239,7 +242,7 @@ impl Command {
|
||||
};
|
||||
call_forkchoice_updated(&auth_provider, payload.version, fcu_state, None).await?;
|
||||
|
||||
info!(gas_ramp_payload = i + 1, "Gas ramp payload executed successfully");
|
||||
info!(target: "reth-bench", gas_ramp_payload = i + 1, "Gas ramp payload executed successfully");
|
||||
|
||||
if let Some(w) = &mut waiter {
|
||||
w.on_block(payload.block_number).await?;
|
||||
@@ -249,7 +252,7 @@ impl Command {
|
||||
}
|
||||
|
||||
if !gas_ramp_payloads.is_empty() {
|
||||
info!(count = gas_ramp_payloads.len(), "All gas ramp payloads replayed");
|
||||
info!(target: "reth-bench", count = gas_ramp_payloads.len(), "All gas ramp payloads replayed");
|
||||
}
|
||||
|
||||
let mut results = Vec::new();
|
||||
@@ -268,6 +271,7 @@ impl Command {
|
||||
execution_payload.payload_inner.payload_inner.transactions.len() as u64;
|
||||
|
||||
debug!(
|
||||
target: "reth-bench",
|
||||
payload = i + 1,
|
||||
total = payloads.len(),
|
||||
index = payload.index,
|
||||
@@ -278,6 +282,7 @@ impl Command {
|
||||
let start = Instant::now();
|
||||
|
||||
debug!(
|
||||
target: "reth-bench",
|
||||
method = "engine_newPayloadV4",
|
||||
block_hash = %block_hash,
|
||||
"Sending newPayload"
|
||||
@@ -304,7 +309,7 @@ impl Command {
|
||||
finalized_block_hash: parent_hash,
|
||||
};
|
||||
|
||||
debug!(method = "engine_forkchoiceUpdatedV3", ?fcu_state, "Sending forkchoiceUpdated");
|
||||
debug!(target: "reth-bench", method = "engine_forkchoiceUpdatedV3", ?fcu_state, "Sending forkchoiceUpdated");
|
||||
|
||||
let fcu_result = auth_provider.fork_choice_updated_v3(fcu_state, None).await?;
|
||||
|
||||
@@ -321,7 +326,7 @@ impl Command {
|
||||
};
|
||||
|
||||
let current_duration = total_benchmark_duration.elapsed();
|
||||
info!(%combined_result);
|
||||
info!(target: "reth-bench", %combined_result);
|
||||
|
||||
if let Some(w) = &mut waiter {
|
||||
w.on_block(block_number).await?;
|
||||
@@ -331,7 +336,7 @@ impl Command {
|
||||
TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
|
||||
results.push((gas_row, combined_result));
|
||||
|
||||
debug!(?status, ?fcu_result, "Payload executed successfully");
|
||||
debug!(target: "reth-bench", ?status, ?fcu_result, "Payload executed successfully");
|
||||
parent_hash = block_hash;
|
||||
}
|
||||
|
||||
@@ -349,6 +354,7 @@ impl Command {
|
||||
let gas_output =
|
||||
TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;
|
||||
info!(
|
||||
target: "reth-bench",
|
||||
total_gas_used = gas_output.total_gas_used,
|
||||
total_duration = ?gas_output.total_duration,
|
||||
execution_duration = ?gas_output.execution_duration,
|
||||
@@ -407,7 +413,8 @@ impl Command {
|
||||
let block_hash =
|
||||
envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
|
||||
|
||||
info!(
|
||||
debug!(
|
||||
target: "reth-bench",
|
||||
index = index,
|
||||
block_hash = %block_hash,
|
||||
path = %path.display(),
|
||||
|
||||
@@ -54,6 +54,7 @@ where
|
||||
payload_attributes: Option<PayloadAttributes>,
|
||||
) -> TransportResult<ForkchoiceUpdated> {
|
||||
debug!(
|
||||
target: "reth-bench",
|
||||
method = "engine_forkchoiceUpdatedV1",
|
||||
?fork_choice_state,
|
||||
?payload_attributes,
|
||||
@@ -66,6 +67,7 @@ where
|
||||
while !status.is_valid() {
|
||||
if status.is_invalid() {
|
||||
error!(
|
||||
target: "reth-bench",
|
||||
?status,
|
||||
?fork_choice_state,
|
||||
?payload_attributes,
|
||||
@@ -91,6 +93,7 @@ where
|
||||
payload_attributes: Option<PayloadAttributes>,
|
||||
) -> TransportResult<ForkchoiceUpdated> {
|
||||
debug!(
|
||||
target: "reth-bench",
|
||||
method = "engine_forkchoiceUpdatedV2",
|
||||
?fork_choice_state,
|
||||
?payload_attributes,
|
||||
@@ -103,6 +106,7 @@ where
|
||||
while !status.is_valid() {
|
||||
if status.is_invalid() {
|
||||
error!(
|
||||
target: "reth-bench",
|
||||
?status,
|
||||
?fork_choice_state,
|
||||
?payload_attributes,
|
||||
@@ -128,6 +132,7 @@ where
|
||||
payload_attributes: Option<PayloadAttributes>,
|
||||
) -> TransportResult<ForkchoiceUpdated> {
|
||||
debug!(
|
||||
target: "reth-bench",
|
||||
method = "engine_forkchoiceUpdatedV3",
|
||||
?fork_choice_state,
|
||||
?payload_attributes,
|
||||
@@ -140,6 +145,7 @@ where
|
||||
while !status.is_valid() {
|
||||
if status.is_invalid() {
|
||||
error!(
|
||||
target: "reth-bench",
|
||||
?status,
|
||||
?fork_choice_state,
|
||||
?payload_attributes,
|
||||
@@ -253,13 +259,13 @@ pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
|
||||
) -> TransportResult<()> {
|
||||
let method = version.method_name();
|
||||
|
||||
debug!(method, "Sending newPayload");
|
||||
debug!(target: "reth-bench", method, "Sending newPayload");
|
||||
|
||||
let mut status: PayloadStatus = provider.client().request(method, ¶ms).await?;
|
||||
|
||||
while !status.is_valid() {
|
||||
if status.is_invalid() {
|
||||
error!(?status, ?params, "Invalid {method}",);
|
||||
error!(target: "reth-bench", ?status, ?params, "Invalid {method}",);
|
||||
return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(std::io::Error::other(
|
||||
format!("Invalid {method}: {status:?}"),
|
||||
))))
|
||||
|
||||
Reference in New Issue
Block a user