feat(re-execute): work-stealing parallelization (#22242)

Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Arsenii Kulikov <klkvrr@gmail.com>
This commit is contained in:
Georgios Konstantopoulos
2026-02-23 07:39:24 -08:00
committed by GitHub
parent ca47a7e9f9
commit 285c325d71
2 changed files with 120 additions and 84 deletions

View File

@@ -20,7 +20,10 @@ use reth_provider::{
use reth_revm::database::StateProviderDatabase;
use reth_stages::stages::calculate_gas_used_from_headers;
use std::{
sync::Arc,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::{Duration, Instant},
};
use tokio::{sync::mpsc, task::JoinSet};
@@ -46,6 +49,10 @@ pub struct Command<C: ChainSpecParser> {
#[arg(long)]
num_tasks: Option<u64>,
/// Number of blocks each worker processes before grabbing the next chunk.
#[arg(long, default_value = "5000")]
blocks_per_chunk: u64,
/// Continues with execution when an invalid block is encountered and collects these blocks.
#[arg(long)]
skip_invalid_blocks: bool,
@@ -92,12 +99,10 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
std::thread::available_parallelism().map(|n| n.get() as u64).unwrap_or(10)
});
let total_blocks = max_block - min_block;
let total_gas = calculate_gas_used_from_headers(
&provider_factory.static_file_provider(),
min_block..=max_block,
)?;
let blocks_per_task = total_blocks / num_tasks;
let db_at = {
let provider_factory = provider_factory.clone();
@@ -109,18 +114,17 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
};
let skip_invalid_blocks = self.skip_invalid_blocks;
let blocks_per_chunk = self.blocks_per_chunk;
let (stats_tx, mut stats_rx) = mpsc::unbounded_channel();
let (info_tx, mut info_rx) = mpsc::unbounded_channel();
let cancellation = CancellationToken::new();
let _guard = cancellation.drop_guard();
let mut tasks = JoinSet::new();
for i in 0..num_tasks {
let start_block = min_block + i * blocks_per_task;
let end_block =
if i == num_tasks - 1 { max_block } else { start_block + blocks_per_task };
// Shared counter for work stealing: workers atomically grab the next chunk of blocks.
let next_block = Arc::new(AtomicU64::new(min_block));
// Spawn thread executing blocks
let mut tasks = JoinSet::new();
for _ in 0..num_tasks {
let provider_factory = provider_factory.clone();
let evm_config = components.evm_config().clone();
let consensus = components.consensus().clone();
@@ -128,95 +132,122 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>
let stats_tx = stats_tx.clone();
let info_tx = info_tx.clone();
let cancellation = cancellation.clone();
let next_block = Arc::clone(&next_block);
tasks.spawn_blocking(move || {
let mut executor = evm_config.batch_executor(db_at(start_block - 1));
let mut executor_created = Instant::now();
let executor_lifetime = Duration::from_secs(120);
'blocks: for block in start_block..end_block {
loop {
if cancellation.is_cancelled() {
// exit if the program is being terminated
break
break;
}
let block = provider_factory
.recovered_block(block.into(), TransactionVariant::NoHash)?
.unwrap();
// Atomically grab the next chunk of blocks.
let chunk_start =
next_block.fetch_add(blocks_per_chunk, Ordering::Relaxed);
if chunk_start >= max_block {
break;
}
let chunk_end = (chunk_start + blocks_per_chunk).min(max_block);
let result = match executor.execute_one(&block) {
Ok(result) => result,
Err(err) => {
if skip_invalid_blocks {
executor = evm_config.batch_executor(db_at(block.number()));
let _ = info_tx.send((block, eyre::Report::new(err)));
continue
}
return Err(err.into())
let mut executor = evm_config.batch_executor(db_at(chunk_start - 1));
let mut executor_created = Instant::now();
'blocks: for block in chunk_start..chunk_end {
if cancellation.is_cancelled() {
break;
}
};
if let Err(err) = consensus
.validate_block_post_execution(&block, &result, None)
.wrap_err_with(|| {
format!("Failed to validate block {} {}", block.number(), block.hash())
})
{
let correct_receipts =
provider_factory.receipts_by_block(block.number().into())?.unwrap();
let block = provider_factory
.recovered_block(block.into(), TransactionVariant::NoHash)?
.unwrap();
for (i, (receipt, correct_receipt)) in
result.receipts.iter().zip(correct_receipts.iter()).enumerate()
{
if receipt != correct_receipt {
let tx_hash = block.body().transactions()[i].tx_hash();
error!(
?receipt,
?correct_receipt,
index = i,
?tx_hash,
"Invalid receipt"
);
let expected_gas_used = correct_receipt.cumulative_gas_used() -
if i == 0 {
0
} else {
correct_receipts[i - 1].cumulative_gas_used()
};
let got_gas_used = receipt.cumulative_gas_used() -
if i == 0 {
0
} else {
result.receipts[i - 1].cumulative_gas_used()
};
if got_gas_used != expected_gas_used {
let mismatch = GotExpected {
expected: expected_gas_used,
got: got_gas_used,
};
error!(number=?block.number(), ?mismatch, "Gas usage mismatch");
if skip_invalid_blocks {
executor = evm_config.batch_executor(db_at(block.number()));
let _ = info_tx.send((block, err));
continue 'blocks;
}
return Err(err);
let result = match executor.execute_one(&block) {
Ok(result) => result,
Err(err) => {
if skip_invalid_blocks {
executor =
evm_config.batch_executor(db_at(block.number()));
let _ =
info_tx.send((block, eyre::Report::new(err)));
continue
}
} else {
continue;
return Err(err.into())
}
};
if let Err(err) = consensus
.validate_block_post_execution(&block, &result, None)
.wrap_err_with(|| {
format!(
"Failed to validate block {} {}",
block.number(),
block.hash()
)
})
{
let correct_receipts = provider_factory
.receipts_by_block(block.number().into())?
.unwrap();
for (i, (receipt, correct_receipt)) in
result.receipts.iter().zip(correct_receipts.iter()).enumerate()
{
if receipt != correct_receipt {
let tx_hash =
block.body().transactions()[i].tx_hash();
error!(
?receipt,
?correct_receipt,
index = i,
?tx_hash,
"Invalid receipt"
);
let expected_gas_used =
correct_receipt.cumulative_gas_used() -
if i == 0 {
0
} else {
correct_receipts[i - 1]
.cumulative_gas_used()
};
let got_gas_used = receipt.cumulative_gas_used() -
if i == 0 {
0
} else {
result.receipts[i - 1].cumulative_gas_used()
};
if got_gas_used != expected_gas_used {
let mismatch = GotExpected {
expected: expected_gas_used,
got: got_gas_used,
};
error!(number=?block.number(), ?mismatch, "Gas usage mismatch");
if skip_invalid_blocks {
executor = evm_config
.batch_executor(db_at(block.number()));
let _ = info_tx.send((block, err));
continue 'blocks;
}
return Err(err);
}
} else {
continue;
}
}
return Err(err);
}
let _ = stats_tx.send(block.gas_used());
return Err(err);
}
let _ = stats_tx.send(block.gas_used());
// Reset DB once in a while to avoid OOM or read tx timeouts
if executor.size_hint() > 1_000_000 ||
executor_created.elapsed() > executor_lifetime
{
executor = evm_config.batch_executor(db_at(block.number()));
executor_created = Instant::now();
// Reset DB once in a while to avoid OOM or read tx timeouts
if executor.size_hint() > 1_000_000 ||
executor_created.elapsed() > executor_lifetime
{
executor =
evm_config.batch_executor(db_at(block.number()));
executor_created = Instant::now();
}
}
}

View File

@@ -132,6 +132,11 @@ Storage:
--num-tasks <NUM_TASKS>
Number of tasks to run in parallel. Defaults to the number of available CPUs
--blocks-per-chunk <BLOCKS_PER_CHUNK>
Number of blocks each worker processes before grabbing the next chunk
[default: 5000]
--skip-invalid-blocks
Continues with execution when an invalid block is encountered and collects these blocks