From 0901c2ca8befc1ae892b723cff4754201f8595d5 Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Thu, 29 Jan 2026 10:08:54 +0000 Subject: [PATCH] fix(reth-bench): retry testing_buildBlockV1 when payload gas < target (#21547) Co-authored-by: Alexey Shekhirin --- .../src/bench/generate_big_block.rs | 471 ++++++++++++------ 1 file changed, 325 insertions(+), 146 deletions(-) diff --git a/bin/reth-bench/src/bench/generate_big_block.rs b/bin/reth-bench/src/bench/generate_big_block.rs index 7869ee829c..7cb7d99f29 100644 --- a/bin/reth-bench/src/bench/generate_big_block.rs +++ b/bin/reth-bench/src/bench/generate_big_block.rs @@ -132,13 +132,24 @@ impl TransactionCollector { /// Collect transactions starting from the given block number. /// /// Skips blob transactions (type 3) and collects until target gas is reached. - /// Returns the collected raw transaction bytes, total gas used, and the next block number. - pub async fn collect(&self, start_block: u64) -> eyre::Result<(Vec, u64, u64)> { - let mut transactions: Vec = Vec::new(); + /// Returns a `CollectionResult` with transactions, gas info, and next block. + pub async fn collect(&self, start_block: u64) -> eyre::Result { + self.collect_gas(start_block, self.target_gas).await + } + + /// Collect transactions up to a specific gas target. + /// + /// This is used both for initial collection and for retry top-ups. + pub async fn collect_gas( + &self, + start_block: u64, + gas_target: u64, + ) -> eyre::Result { + let mut transactions: Vec = Vec::new(); let mut total_gas: u64 = 0; let mut current_block = start_block; - while total_gas < self.target_gas { + while total_gas < gas_target { let Some((block_txs, _)) = self.source.fetch_block_transactions(current_block).await? else { warn!(block = current_block, "Block not found, stopping"); @@ -151,12 +162,12 @@ impl TransactionCollector { continue; } - if total_gas + tx.gas_used <= self.target_gas { - transactions.push(tx.raw); + if total_gas + tx.gas_used <= gas_target { total_gas += tx.gas_used; + transactions.push(tx); } - if total_gas >= self.target_gas { + if total_gas >= gas_target { break; } } @@ -164,7 +175,7 @@ impl TransactionCollector { current_block += 1; // Stop early if remaining gas is under 1M (close enough to target) - let remaining_gas = self.target_gas.saturating_sub(total_gas); + let remaining_gas = gas_target.saturating_sub(total_gas); if remaining_gas < 1_000_000 { break; } @@ -172,12 +183,12 @@ impl TransactionCollector { info!( total_txs = transactions.len(), - total_gas, + gas_sent = total_gas, next_block = current_block, "Finished collecting transactions" ); - Ok((transactions, total_gas, current_block)) + Ok(CollectionResult { transactions, gas_sent: total_gas, next_block: current_block }) } } @@ -252,6 +263,80 @@ struct BuiltPayload { envelope: ExecutionPayloadEnvelopeV4, block_hash: B256, timestamp: u64, + /// The actual gas used in the built block. + gas_used: u64, +} + +/// Result of collecting transactions from blocks. +#[derive(Debug)] +pub struct CollectionResult { + /// Collected transactions with their gas info. + pub transactions: Vec, + /// Total gas sent (sum of historical `gas_used` for all collected txs). + pub gas_sent: u64, + /// Next block number to continue collecting from. + pub next_block: u64, +} + +/// Constants for retry logic. +const MAX_BUILD_RETRIES: u32 = 5; +/// Maximum retries for fetching a transaction batch. +const MAX_FETCH_RETRIES: u32 = 5; +/// Tolerance: if `gas_used` is within 1M of target, don't retry. +const MIN_TARGET_SLACK: u64 = 1_000_000; +/// Maximum gas to request in retries (10x target as safety cap). +const MAX_ADDITIONAL_GAS_MULTIPLIER: u64 = 10; + +/// Fetches a batch of transactions with retry logic. +/// +/// Returns `None` if all retries are exhausted. +async fn fetch_batch_with_retry( + collector: &TransactionCollector, + block: u64, +) -> Option { + for attempt in 1..=MAX_FETCH_RETRIES { + match collector.collect(block).await { + Ok(result) => return Some(result), + Err(e) => { + if attempt == MAX_FETCH_RETRIES { + warn!(attempt, error = %e, "Failed to fetch transactions after max retries"); + return None; + } + warn!(attempt, error = %e, "Failed to fetch transactions, retrying..."); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + } + None +} + +/// Outcome of a build attempt check. +enum RetryOutcome { + /// Payload is close enough to target gas. + Success, + /// Max retries reached, accept what we have. + MaxRetries, + /// Need more transactions with the specified gas amount. + NeedMore(u64), +} + +/// Buffer for receiving transaction batches from the fetcher. +/// +/// This abstracts over the channel to allow the main loop to request +/// batches on demand, including for retries. +struct TxBuffer { + receiver: mpsc::Receiver, +} + +impl TxBuffer { + const fn new(receiver: mpsc::Receiver) -> Self { + Self { receiver } + } + + /// Take the next available batch from the fetcher. + async fn take_batch(&mut self) -> Option { + self.receiver.recv().await + } } impl Command { @@ -312,19 +397,20 @@ impl Command { ) .await?; } else { - // Single payload - collect transactions and build + // Single payload - collect transactions and build with retry let tx_source = RpcTransactionSource::from_url(&self.rpc_url)?; let collector = TransactionCollector::new(tx_source, self.target_gas); - let (transactions, _total_gas, _next_block) = collector.collect(start_block).await?; + let result = collector.collect(start_block).await?; - if transactions.is_empty() { + if result.transactions.is_empty() { return Err(eyre::eyre!("No transactions collected")); } - self.execute_sequential( + self.execute_sequential_with_retry( &auth_provider, &testing_provider, - transactions, + &collector, + result, parent_hash, parent_timestamp, ) @@ -335,32 +421,34 @@ impl Command { Ok(()) } - /// Sequential execution path for single payload or no-execute mode. - async fn execute_sequential( + /// Sequential execution path with retry logic for underfilled payloads. + async fn execute_sequential_with_retry( &self, auth_provider: &RootProvider, testing_provider: &RootProvider, - transactions: Vec, + collector: &TransactionCollector, + initial_result: CollectionResult, mut parent_hash: B256, mut parent_timestamp: u64, ) -> eyre::Result<()> { - for i in 0..self.count { - info!( - payload = i + 1, - total = self.count, - parent_hash = %parent_hash, - parent_timestamp = parent_timestamp, - "Building payload via testing_buildBlockV1" - ); + let mut current_result = initial_result; + for i in 0..self.count { let built = self - .build_payload(testing_provider, &transactions, i, parent_hash, parent_timestamp) + .build_with_retry( + testing_provider, + collector, + &mut current_result, + i, + parent_hash, + parent_timestamp, + ) .await?; self.save_payload(&built)?; if self.execute || self.count > 1 { - info!(payload = i + 1, block_hash = %built.block_hash, "Executing payload (newPayload + FCU)"); + info!(payload = i + 1, block_hash = %built.block_hash, gas_used = built.gas_used, "Executing payload (newPayload + FCU)"); self.execute_payload_v4(auth_provider, built.envelope, parent_hash).await?; info!(payload = i + 1, "Payload executed successfully"); } @@ -371,7 +459,62 @@ impl Command { Ok(()) } - /// Pipelined execution - fetches transactions and builds payloads in background. + /// Build a payload with retry logic when `gas_used` is below target. + /// + /// Uses the ratio of `gas_used/gas_sent` to estimate how many more transactions + /// are needed to hit the target gas. + async fn build_with_retry( + &self, + testing_provider: &RootProvider, + collector: &TransactionCollector, + result: &mut CollectionResult, + index: u64, + parent_hash: B256, + parent_timestamp: u64, + ) -> eyre::Result { + for attempt in 1..=MAX_BUILD_RETRIES { + let tx_bytes: Vec = result.transactions.iter().map(|t| t.raw.clone()).collect(); + let gas_sent = result.gas_sent; + + info!( + payload = index + 1, + attempt, + tx_count = tx_bytes.len(), + gas_sent, + parent_hash = %parent_hash, + "Building payload via testing_buildBlockV1" + ); + + let built = Self::build_payload_static( + testing_provider, + &tx_bytes, + index, + parent_hash, + parent_timestamp, + ) + .await?; + + match self.check_retry_outcome(&built, index, attempt, gas_sent) { + RetryOutcome::Success | RetryOutcome::MaxRetries => return Ok(built), + RetryOutcome::NeedMore(additional_gas) => { + let additional = + collector.collect_gas(result.next_block, additional_gas).await?; + result.transactions.extend(additional.transactions); + result.gas_sent = result.gas_sent.saturating_add(additional.gas_sent); + result.next_block = additional.next_block; + } + } + } + + warn!(payload = index + 1, "Retry loop exited without returning a payload"); + Err(eyre::eyre!("build_with_retry exhausted retries without result")) + } + + /// Pipelined execution - fetches transactions in background, builds with retry. + /// + /// The fetcher continuously produces transaction batches. The main loop consumes them, + /// builds payloads with retry logic (requesting more transactions if underfilled), + /// and executes each payload before moving to the next. async fn execute_pipelined( &self, auth_provider: &RootProvider, @@ -380,180 +523,215 @@ impl Command { initial_parent_hash: B256, initial_parent_timestamp: u64, ) -> eyre::Result<()> { - // Create channel for transaction batches (one batch per payload) - let (tx_sender, mut tx_receiver) = mpsc::channel::>(self.prefetch_buffer); + // Create channel for transaction batches - fetcher sends CollectionResult + let (tx_sender, tx_receiver) = mpsc::channel::(self.prefetch_buffer); // Spawn background task to continuously fetch transaction batches let rpc_url = self.rpc_url.clone(); let target_gas = self.target_gas; - let count = self.count; let fetcher_handle = tokio::spawn(async move { let tx_source = match RpcTransactionSource::from_url(&rpc_url) { Ok(source) => source, Err(e) => { warn!(error = %e, "Failed to create transaction source"); - return; + return None; } }; let collector = TransactionCollector::new(tx_source, target_gas); let mut current_block = start_block; - for payload_idx in 0..count { - const MAX_RETRIES: u32 = 5; - let mut attempts = 0; - let result = loop { - attempts += 1; - match collector.collect(current_block).await { - Ok(res) => break Some(res), - Err(e) => { - if attempts >= MAX_RETRIES { - warn!(payload = payload_idx + 1, attempts, error = %e, "Failed to fetch transactions after max retries"); - break None; - } - warn!(payload = payload_idx + 1, attempts, error = %e, "Failed to fetch transactions, retrying..."); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - } - }; - - let Some((transactions, total_gas, next_block)) = result else { - break; - }; - + while let Some(batch) = fetch_batch_with_retry(&collector, current_block).await { info!( - payload = payload_idx + 1, - tx_count = transactions.len(), - total_gas, - blocks = format!("{}..{}", current_block, next_block), - "Fetched transactions" + tx_count = batch.transactions.len(), + gas_sent = batch.gas_sent, + blocks = format!("{}..{}", current_block, batch.next_block), + "Fetched transaction batch" ); - current_block = next_block; + current_block = batch.next_block; - if tx_sender.send(transactions).await.is_err() { + if tx_sender.send(batch).await.is_err() { break; } } + + Some(current_block) }); + // Transaction buffer: holds transactions from batches + any extras from retries + let mut tx_buffer = TxBuffer::new(tx_receiver); + let mut parent_hash = initial_parent_hash; let mut parent_timestamp = initial_parent_timestamp; - let mut pending_build: Option>> = None; for i in 0..self.count { - let is_last = i == self.count - 1; + // Get initial batch of transactions for this payload + let mut result = tx_buffer + .take_batch() + .await + .ok_or_else(|| eyre::eyre!("Transaction fetcher stopped unexpectedly"))?; - // Get current payload (either from pending build or build now) - let current_payload = if let Some(handle) = pending_build.take() { - handle.await?? - } else { - // First payload - wait for transactions and build synchronously - let transactions = tx_receiver - .recv() - .await - .ok_or_else(|| eyre::eyre!("Transaction fetcher stopped unexpectedly"))?; + if result.transactions.is_empty() { + return Err(eyre::eyre!("No transactions collected for payload {}", i + 1)); + } - if transactions.is_empty() { - return Err(eyre::eyre!("No transactions collected for payload {}", i + 1)); - } - - info!( - payload = i + 1, - total = self.count, - parent_hash = %parent_hash, - parent_timestamp = parent_timestamp, - tx_count = transactions.len(), - "Building payload via testing_buildBlockV1" - ); - self.build_payload( + // Build with retry - may need to request more transactions + let built = self + .build_with_retry_buffered( testing_provider, - &transactions, + &mut tx_buffer, + &mut result, i, parent_hash, parent_timestamp, ) - .await? - }; + .await?; - self.save_payload(¤t_payload)?; + self.save_payload(&built)?; - let current_block_hash = current_payload.block_hash; - let current_timestamp = current_payload.timestamp; + let current_block_hash = built.block_hash; + let current_timestamp = built.timestamp; - // Execute current payload first - info!(payload = i + 1, block_hash = %current_block_hash, "Executing payload (newPayload + FCU)"); - self.execute_payload_v4(auth_provider, current_payload.envelope, parent_hash).await?; + // Execute payload + info!(payload = i + 1, block_hash = %current_block_hash, gas_used = built.gas_used, "Executing payload (newPayload + FCU)"); + self.execute_payload_v4(auth_provider, built.envelope, parent_hash).await?; info!(payload = i + 1, "Payload executed successfully"); - // Start building next payload in background (if not last) - AFTER execution - if !is_last { - // Get transactions for next payload (should already be fetched or fetching) - let next_transactions = tx_receiver - .recv() - .await - .ok_or_else(|| eyre::eyre!("Transaction fetcher stopped unexpectedly"))?; - - if next_transactions.is_empty() { - return Err(eyre::eyre!("No transactions collected for payload {}", i + 2)); - } - - let testing_provider = testing_provider.clone(); - let next_index = i + 1; - let total = self.count; - - pending_build = Some(tokio::spawn(async move { - info!( - payload = next_index + 1, - total = total, - parent_hash = %current_block_hash, - parent_timestamp = current_timestamp, - tx_count = next_transactions.len(), - "Building payload via testing_buildBlockV1" - ); - - Self::build_payload_static( - &testing_provider, - &next_transactions, - next_index, - current_block_hash, - current_timestamp, - ) - .await - })); - } - parent_hash = current_block_hash; parent_timestamp = current_timestamp; } // Clean up the fetcher task - drop(tx_receiver); + drop(tx_buffer); let _ = fetcher_handle.await; Ok(()) } - /// Build a single payload via `testing_buildBlockV1`. - async fn build_payload( + /// Build a payload with retry logic, using the buffered transaction source. + async fn build_with_retry_buffered( &self, testing_provider: &RootProvider, - transactions: &[Bytes], + tx_buffer: &mut TxBuffer, + result: &mut CollectionResult, index: u64, parent_hash: B256, parent_timestamp: u64, ) -> eyre::Result { - Self::build_payload_static( - testing_provider, - transactions, - index, - parent_hash, - parent_timestamp, - ) - .await + for attempt in 1..=MAX_BUILD_RETRIES { + let tx_bytes: Vec = result.transactions.iter().map(|t| t.raw.clone()).collect(); + let gas_sent = result.gas_sent; + + info!( + payload = index + 1, + attempt, + tx_count = tx_bytes.len(), + gas_sent, + parent_hash = %parent_hash, + "Building payload via testing_buildBlockV1" + ); + + let built = Self::build_payload_static( + testing_provider, + &tx_bytes, + index, + parent_hash, + parent_timestamp, + ) + .await?; + + match self.check_retry_outcome(&built, index, attempt, gas_sent) { + RetryOutcome::Success | RetryOutcome::MaxRetries => return Ok(built), + RetryOutcome::NeedMore(additional_gas) => { + let mut collected_gas = 0u64; + while collected_gas < additional_gas { + if let Some(batch) = tx_buffer.take_batch().await { + collected_gas += batch.gas_sent; + result.transactions.extend(batch.transactions); + result.gas_sent = result.gas_sent.saturating_add(batch.gas_sent); + result.next_block = batch.next_block; + } else { + warn!("Transaction fetcher exhausted, proceeding with available transactions"); + break; + } + } + } + } + } + + warn!(payload = index + 1, "Retry loop exited without returning a payload"); + Err(eyre::eyre!("build_with_retry_buffered exhausted retries without result")) } - /// Static version for use in spawned tasks. + /// Determines the outcome of a build attempt. + fn check_retry_outcome( + &self, + built: &BuiltPayload, + index: u64, + attempt: u32, + gas_sent: u64, + ) -> RetryOutcome { + let gas_used = built.gas_used; + + if gas_used + MIN_TARGET_SLACK >= self.target_gas { + info!( + payload = index + 1, + gas_used, + target_gas = self.target_gas, + attempts = attempt, + "Payload built successfully" + ); + return RetryOutcome::Success; + } + + if attempt == MAX_BUILD_RETRIES { + warn!( + payload = index + 1, + gas_used, + target_gas = self.target_gas, + gas_sent, + "Underfilled after max retries, accepting payload" + ); + return RetryOutcome::MaxRetries; + } + + if gas_used == 0 { + warn!( + payload = index + 1, + "Zero gas used in payload, requesting fixed chunk of additional transactions" + ); + return RetryOutcome::NeedMore(self.target_gas); + } + + let gas_sent_needed_total = + (self.target_gas as u128 * gas_sent as u128).div_ceil(gas_used as u128) as u64; + let additional = gas_sent_needed_total.saturating_sub(gas_sent); + let additional = additional.min(self.target_gas * MAX_ADDITIONAL_GAS_MULTIPLIER); + + if additional == 0 { + info!( + payload = index + 1, + gas_used, + target_gas = self.target_gas, + "No additional transactions needed based on ratio" + ); + return RetryOutcome::Success; + } + + let ratio = gas_used as f64 / gas_sent as f64; + info!( + payload = index + 1, + gas_used, + gas_sent, + ratio = format!("{:.4}", ratio), + additional_gas = additional, + "Underfilled, collecting more transactions for retry" + ); + RetryOutcome::NeedMore(additional) + } + + /// Build a single payload via `testing_buildBlockV1`. async fn build_payload_static( testing_provider: &RootProvider, transactions: &[Bytes], @@ -591,8 +769,9 @@ impl Command { let block_hash = inner.block_hash; let block_number = inner.block_number; let timestamp = inner.timestamp; + let gas_used = inner.gas_used; - Ok(BuiltPayload { block_number, envelope: v4_envelope, block_hash, timestamp }) + Ok(BuiltPayload { block_number, envelope: v4_envelope, block_hash, timestamp, gas_used }) } /// Save a payload to disk.