diff --git a/crates/cli/commands/src/re_execute.rs b/crates/cli/commands/src/re_execute.rs index 8086759227..61e39ae81c 100644 --- a/crates/cli/commands/src/re_execute.rs +++ b/crates/cli/commands/src/re_execute.rs @@ -20,7 +20,10 @@ use reth_provider::{ use reth_revm::database::StateProviderDatabase; use reth_stages::stages::calculate_gas_used_from_headers; use std::{ - sync::Arc, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, time::{Duration, Instant}, }; use tokio::{sync::mpsc, task::JoinSet}; @@ -46,6 +49,10 @@ pub struct Command { #[arg(long)] num_tasks: Option, + /// Number of blocks each worker processes before grabbing the next chunk. + #[arg(long, default_value = "5000")] + blocks_per_chunk: u64, + /// Continues with execution when an invalid block is encountered and collects these blocks. #[arg(long)] skip_invalid_blocks: bool, @@ -92,12 +99,10 @@ impl std::thread::available_parallelism().map(|n| n.get() as u64).unwrap_or(10) }); - let total_blocks = max_block - min_block; let total_gas = calculate_gas_used_from_headers( &provider_factory.static_file_provider(), min_block..=max_block, )?; - let blocks_per_task = total_blocks / num_tasks; let db_at = { let provider_factory = provider_factory.clone(); @@ -109,18 +114,17 @@ impl }; let skip_invalid_blocks = self.skip_invalid_blocks; + let blocks_per_chunk = self.blocks_per_chunk; let (stats_tx, mut stats_rx) = mpsc::unbounded_channel(); let (info_tx, mut info_rx) = mpsc::unbounded_channel(); let cancellation = CancellationToken::new(); let _guard = cancellation.drop_guard(); - let mut tasks = JoinSet::new(); - for i in 0..num_tasks { - let start_block = min_block + i * blocks_per_task; - let end_block = - if i == num_tasks - 1 { max_block } else { start_block + blocks_per_task }; + // Shared counter for work stealing: workers atomically grab the next chunk of blocks. + let next_block = Arc::new(AtomicU64::new(min_block)); - // Spawn thread executing blocks + let mut tasks = JoinSet::new(); + for _ in 0..num_tasks { let provider_factory = provider_factory.clone(); let evm_config = components.evm_config().clone(); let consensus = components.consensus().clone(); @@ -128,95 +132,122 @@ impl let stats_tx = stats_tx.clone(); let info_tx = info_tx.clone(); let cancellation = cancellation.clone(); + let next_block = Arc::clone(&next_block); tasks.spawn_blocking(move || { - let mut executor = evm_config.batch_executor(db_at(start_block - 1)); - let mut executor_created = Instant::now(); let executor_lifetime = Duration::from_secs(120); - 'blocks: for block in start_block..end_block { + loop { if cancellation.is_cancelled() { - // exit if the program is being terminated - break + break; } - let block = provider_factory - .recovered_block(block.into(), TransactionVariant::NoHash)? - .unwrap(); + // Atomically grab the next chunk of blocks. + let chunk_start = + next_block.fetch_add(blocks_per_chunk, Ordering::Relaxed); + if chunk_start >= max_block { + break; + } + let chunk_end = (chunk_start + blocks_per_chunk).min(max_block); - let result = match executor.execute_one(&block) { - Ok(result) => result, - Err(err) => { - if skip_invalid_blocks { - executor = evm_config.batch_executor(db_at(block.number())); - let _ = info_tx.send((block, eyre::Report::new(err))); - continue - } - return Err(err.into()) + let mut executor = evm_config.batch_executor(db_at(chunk_start - 1)); + let mut executor_created = Instant::now(); + + 'blocks: for block in chunk_start..chunk_end { + if cancellation.is_cancelled() { + break; } - }; - if let Err(err) = consensus - .validate_block_post_execution(&block, &result, None) - .wrap_err_with(|| { - format!("Failed to validate block {} {}", block.number(), block.hash()) - }) - { - let correct_receipts = - provider_factory.receipts_by_block(block.number().into())?.unwrap(); + let block = provider_factory + .recovered_block(block.into(), TransactionVariant::NoHash)? + .unwrap(); - for (i, (receipt, correct_receipt)) in - result.receipts.iter().zip(correct_receipts.iter()).enumerate() - { - if receipt != correct_receipt { - let tx_hash = block.body().transactions()[i].tx_hash(); - error!( - ?receipt, - ?correct_receipt, - index = i, - ?tx_hash, - "Invalid receipt" - ); - let expected_gas_used = correct_receipt.cumulative_gas_used() - - if i == 0 { - 0 - } else { - correct_receipts[i - 1].cumulative_gas_used() - }; - let got_gas_used = receipt.cumulative_gas_used() - - if i == 0 { - 0 - } else { - result.receipts[i - 1].cumulative_gas_used() - }; - if got_gas_used != expected_gas_used { - let mismatch = GotExpected { - expected: expected_gas_used, - got: got_gas_used, - }; - - error!(number=?block.number(), ?mismatch, "Gas usage mismatch"); - if skip_invalid_blocks { - executor = evm_config.batch_executor(db_at(block.number())); - let _ = info_tx.send((block, err)); - continue 'blocks; - } - return Err(err); + let result = match executor.execute_one(&block) { + Ok(result) => result, + Err(err) => { + if skip_invalid_blocks { + executor = + evm_config.batch_executor(db_at(block.number())); + let _ = + info_tx.send((block, eyre::Report::new(err))); + continue } - } else { - continue; + return Err(err.into()) } + }; + + if let Err(err) = consensus + .validate_block_post_execution(&block, &result, None) + .wrap_err_with(|| { + format!( + "Failed to validate block {} {}", + block.number(), + block.hash() + ) + }) + { + let correct_receipts = provider_factory + .receipts_by_block(block.number().into())? + .unwrap(); + + for (i, (receipt, correct_receipt)) in + result.receipts.iter().zip(correct_receipts.iter()).enumerate() + { + if receipt != correct_receipt { + let tx_hash = + block.body().transactions()[i].tx_hash(); + error!( + ?receipt, + ?correct_receipt, + index = i, + ?tx_hash, + "Invalid receipt" + ); + let expected_gas_used = + correct_receipt.cumulative_gas_used() - + if i == 0 { + 0 + } else { + correct_receipts[i - 1] + .cumulative_gas_used() + }; + let got_gas_used = receipt.cumulative_gas_used() - + if i == 0 { + 0 + } else { + result.receipts[i - 1].cumulative_gas_used() + }; + if got_gas_used != expected_gas_used { + let mismatch = GotExpected { + expected: expected_gas_used, + got: got_gas_used, + }; + + error!(number=?block.number(), ?mismatch, "Gas usage mismatch"); + if skip_invalid_blocks { + executor = evm_config + .batch_executor(db_at(block.number())); + let _ = info_tx.send((block, err)); + continue 'blocks; + } + return Err(err); + } + } else { + continue; + } + } + + return Err(err); } + let _ = stats_tx.send(block.gas_used()); - return Err(err); - } - let _ = stats_tx.send(block.gas_used()); - - // Reset DB once in a while to avoid OOM or read tx timeouts - if executor.size_hint() > 1_000_000 || - executor_created.elapsed() > executor_lifetime - { - executor = evm_config.batch_executor(db_at(block.number())); - executor_created = Instant::now(); + // Reset DB once in a while to avoid OOM or read tx timeouts + if executor.size_hint() > 1_000_000 || + executor_created.elapsed() > executor_lifetime + { + executor = + evm_config.batch_executor(db_at(block.number())); + executor_created = Instant::now(); + } } } diff --git a/docs/vocs/docs/pages/cli/reth/re-execute.mdx b/docs/vocs/docs/pages/cli/reth/re-execute.mdx index e6b57479dd..b5bdafe649 100644 --- a/docs/vocs/docs/pages/cli/reth/re-execute.mdx +++ b/docs/vocs/docs/pages/cli/reth/re-execute.mdx @@ -132,6 +132,11 @@ Storage: --num-tasks Number of tasks to run in parallel. Defaults to the number of available CPUs + --blocks-per-chunk + Number of blocks each worker processes before grabbing the next chunk + + [default: 5000] + --skip-invalid-blocks Continues with execution when an invalid block is encountered and collects these blocks