feat: re-execute command (#17330)

This commit is contained in:
Arsenii Kulikov
2025-07-10 13:21:51 +03:00
committed by GitHub
parent ea944fa75a
commit 1a7c335a60
12 changed files with 427 additions and 9 deletions

1
Cargo.lock generated
View File

@@ -7403,6 +7403,7 @@ dependencies = [
"reth-provider",
"reth-prune",
"reth-prune-types",
"reth-revm",
"reth-stages",
"reth-stages-types",
"reth-static-file",

View File

@@ -43,6 +43,7 @@ reth-ethereum-primitives = { workspace = true, optional = true }
reth-provider.workspace = true
reth-prune.workspace = true
reth-prune-types = { workspace = true, optional = true }
reth-revm.workspace = true
reth-stages.workspace = true
reth-stages-types = { workspace = true, optional = true }
reth-static-file-types = { workspace = true, features = ["clap"] }

View File

@@ -260,3 +260,18 @@ where
&self.1
}
}
/// Helper trait alias for an [`FnOnce`] producing [`CliNodeComponents`].
pub trait CliComponentsBuilder<N: CliNodeTypes>:
FnOnce(Arc<N::ChainSpec>) -> Self::Components
{
type Components: CliNodeComponents<N>;
}
impl<N: CliNodeTypes, F, Comp> CliComponentsBuilder<N> for F
where
F: FnOnce(Arc<N::ChainSpec>) -> Comp,
Comp: CliNodeComponents<N>,
{
type Components = Comp;
}

View File

@@ -22,6 +22,7 @@ pub mod launcher;
pub mod node;
pub mod p2p;
pub mod prune;
pub mod re_execute;
pub mod recover;
pub mod stage;
#[cfg(feature = "arbitrary")]

View File

@@ -0,0 +1,222 @@
//! Re-execute blocks from database in parallel.
use crate::common::{
AccessRights, CliComponentsBuilder, CliNodeComponents, CliNodeTypes, Environment,
EnvironmentArgs,
};
use alloy_consensus::{BlockHeader, TxReceipt};
use clap::Parser;
use eyre::WrapErr;
use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks};
use reth_cli::chainspec::ChainSpecParser;
use reth_consensus::FullConsensus;
use reth_evm::{execute::Executor, ConfigureEvm};
use reth_primitives_traits::{format_gas_throughput, BlockBody, GotExpected, SignedTransaction};
use reth_provider::{
BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory, ReceiptProvider,
StaticFileProviderFactory, TransactionVariant,
};
use reth_revm::database::StateProviderDatabase;
use reth_stages::stages::calculate_gas_used_from_headers;
use std::{
sync::Arc,
time::{Duration, Instant},
};
use tokio::{sync::mpsc, task::JoinSet};
use tracing::*;
/// `reth re-execute` command
///
/// Re-execute blocks in parallel to verify historical sync correctness.
#[derive(Debug, Parser)]
pub struct Command<C: ChainSpecParser> {
#[command(flatten)]
env: EnvironmentArgs<C>,
/// The height to start at.
#[arg(long, default_value = "1")]
from: u64,
/// The height to end at. Defaults to the latest block.
#[arg(long)]
to: Option<u64>,
/// Number of tasks to run in parallel
#[arg(long, default_value = "10")]
num_tasks: u64,
}
impl<C: ChainSpecParser> Command<C> {
/// Returns the underlying chain being used to run this command
pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
Some(&self.env.chain)
}
}
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>> Command<C> {
/// Execute `re-execute` command
pub async fn execute<N>(self, components: impl CliComponentsBuilder<N>) -> eyre::Result<()>
where
N: CliNodeTypes<ChainSpec = C::ChainSpec>,
{
let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RO)?;
let provider = provider_factory.database_provider_ro()?;
let components = components(provider_factory.chain_spec());
let min_block = self.from;
let max_block = self.to.unwrap_or(provider.best_block_number()?);
let total_blocks = max_block - min_block;
let total_gas = calculate_gas_used_from_headers(
&provider_factory.static_file_provider(),
min_block..=max_block,
)?;
let blocks_per_task = total_blocks / self.num_tasks;
let db_at = {
let provider_factory = provider_factory.clone();
move |block_number: u64| {
StateProviderDatabase(
provider_factory.history_by_block_number(block_number).unwrap(),
)
}
};
let (stats_tx, mut stats_rx) = mpsc::unbounded_channel();
let mut tasks = JoinSet::new();
for i in 0..self.num_tasks {
let start_block = min_block + i * blocks_per_task;
let end_block =
if i == self.num_tasks - 1 { max_block } else { start_block + blocks_per_task };
// Spawn thread executing blocks
let provider_factory = provider_factory.clone();
let evm_config = components.evm_config().clone();
let consensus = components.consensus().clone();
let db_at = db_at.clone();
let stats_tx = stats_tx.clone();
tasks.spawn_blocking(move || {
let mut executor = evm_config.batch_executor(db_at(start_block - 1));
for block in start_block..end_block {
let block = provider_factory
.recovered_block(block.into(), TransactionVariant::NoHash)?
.unwrap();
let result = executor.execute_one(&block)?;
if let Err(err) = consensus
.validate_block_post_execution(&block, &result)
.wrap_err_with(|| format!("Failed to validate block {}", block.number()))
{
let correct_receipts =
provider_factory.receipts_by_block(block.number().into())?.unwrap();
for (i, (receipt, correct_receipt)) in
result.receipts.iter().zip(correct_receipts.iter()).enumerate()
{
if receipt != correct_receipt {
let tx_hash = block.body().transactions()[i].tx_hash();
error!(
?receipt,
?correct_receipt,
index = i,
?tx_hash,
"Invalid receipt"
);
let expected_gas_used = correct_receipt.cumulative_gas_used() -
if i == 0 {
0
} else {
correct_receipts[i - 1].cumulative_gas_used()
};
let got_gas_used = receipt.cumulative_gas_used() -
if i == 0 {
0
} else {
result.receipts[i - 1].cumulative_gas_used()
};
if got_gas_used != expected_gas_used {
let mismatch = GotExpected {
expected: expected_gas_used,
got: got_gas_used,
};
error!(number=?block.number(), ?mismatch, "Gas usage mismatch");
return Err(err);
}
} else {
continue;
}
}
return Err(err);
}
let _ = stats_tx.send(block.gas_used());
// Reset DB once in a while to avoid OOM
if executor.size_hint() > 1_000_000 {
executor = evm_config.batch_executor(db_at(block.number()));
}
}
eyre::Ok(())
});
}
let instant = Instant::now();
let mut total_executed_blocks = 0;
let mut total_executed_gas = 0;
let mut last_logged_gas = 0;
let mut last_logged_blocks = 0;
let mut last_logged_time = Instant::now();
let mut interval = tokio::time::interval(Duration::from_secs(10));
loop {
tokio::select! {
Some(gas_used) = stats_rx.recv() => {
total_executed_blocks += 1;
total_executed_gas += gas_used;
}
result = tasks.join_next() => {
if let Some(result) = result {
if matches!(result, Err(_) | Ok(Err(_))) {
error!(?result);
return Err(eyre::eyre!("Re-execution failed: {result:?}"));
}
} else {
break;
}
}
_ = interval.tick() => {
let blocks_executed = total_executed_blocks - last_logged_blocks;
let gas_executed = total_executed_gas - last_logged_gas;
if blocks_executed > 0 {
let progress = 100.0 * total_executed_gas as f64 / total_gas as f64;
info!(
throughput=?format_gas_throughput(gas_executed, last_logged_time.elapsed()),
progress=format!("{progress:.2}%"),
"Executed {blocks_executed} blocks"
);
}
last_logged_blocks = total_executed_blocks;
last_logged_gas = total_executed_gas;
last_logged_time = Instant::now();
}
}
}
info!(
start_block = min_block,
end_block = max_block,
throughput=?format_gas_throughput(total_executed_gas, instant.elapsed()),
"Re-executed successfully"
);
Ok(())
}
}

View File

@@ -8,7 +8,7 @@ use reth_cli_commands::{
config_cmd, db, download, dump_genesis, import, import_era, init_cmd, init_state,
launcher::FnLauncher,
node::{self, NoArgs},
p2p, prune, recover, stage,
p2p, prune, re_execute, recover, stage,
};
use reth_cli_runner::CliRunner;
use reth_db::DatabaseEnv;
@@ -186,6 +186,9 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>, Ext: clap::Args + fmt::Debug> Cl
runner.run_command_until_exit(|ctx| command.execute::<EthereumNode>(ctx))
}
Commands::Prune(command) => runner.run_until_ctrl_c(command.execute::<EthereumNode>()),
Commands::ReExecute(command) => {
runner.run_until_ctrl_c(command.execute::<EthereumNode>(components))
}
}
}
@@ -248,6 +251,9 @@ pub enum Commands<C: ChainSpecParser, Ext: clap::Args + fmt::Debug> {
/// Prune according to the configuration without any limits
#[command(name = "prune")]
Prune(prune::PruneCommand<C>),
/// Re-execute blocks in parallel to verify historical sync correctness.
#[command(name = "re-execute")]
ReExecute(re_execute::Command<C>),
}
impl<C: ChainSpecParser, Ext: clap::Args + fmt::Debug> Commands<C, Ext> {
@@ -270,6 +276,7 @@ impl<C: ChainSpecParser, Ext: clap::Args + fmt::Debug> Commands<C, Ext> {
Self::Debug(cmd) => cmd.chain_spec(),
Self::Recover(cmd) => cmd.chain_spec(),
Self::Prune(cmd) => cmd.chain_spec(),
Self::ReExecute(cmd) => cmd.chain_spec(),
}
}
}

View File

@@ -8,7 +8,7 @@ use reth_optimism_chainspec::OpChainSpec;
use reth_optimism_consensus::OpBeaconConsensus;
use reth_optimism_node::{OpExecutorProvider, OpNode};
use reth_tracing::{FileWorkerGuard, Layers};
use std::fmt;
use std::{fmt, sync::Arc};
use tracing::info;
/// A wrapper around a parsed CLI that handles command execution.
@@ -65,6 +65,10 @@ where
// Install the prometheus recorder to be sure to record all metrics
let _ = install_prometheus_recorder();
let components = |spec: Arc<OpChainSpec>| {
(OpExecutorProvider::optimism(spec.clone()), OpBeaconConsensus::new(spec))
};
match self.cli.command {
Commands::Node(command) => {
runner.run_command_until_exit(|ctx| command.execute(ctx, launcher))
@@ -83,11 +87,9 @@ where
}
Commands::DumpGenesis(command) => runner.run_blocking_until_ctrl_c(command.execute()),
Commands::Db(command) => runner.run_blocking_until_ctrl_c(command.execute::<OpNode>()),
Commands::Stage(command) => runner.run_command_until_exit(|ctx| {
command.execute::<OpNode, _>(ctx, |spec| {
(OpExecutorProvider::optimism(spec.clone()), OpBeaconConsensus::new(spec))
})
}),
Commands::Stage(command) => {
runner.run_command_until_exit(|ctx| command.execute::<OpNode, _>(ctx, components))
}
Commands::P2P(command) => runner.run_until_ctrl_c(command.execute::<OpNode>()),
Commands::Config(command) => runner.run_until_ctrl_c(command.execute()),
Commands::Recover(command) => {
@@ -96,6 +98,9 @@ where
Commands::Prune(command) => runner.run_until_ctrl_c(command.execute::<OpNode>()),
#[cfg(feature = "dev")]
Commands::TestVectors(command) => runner.run_until_ctrl_c(command.execute()),
Commands::ReExecute(command) => {
runner.run_until_ctrl_c(command.execute::<OpNode>(components))
}
}
}

View File

@@ -7,7 +7,7 @@ use reth_cli::chainspec::ChainSpecParser;
use reth_cli_commands::{
config_cmd, db, dump_genesis, init_cmd,
node::{self, NoArgs},
p2p, prune, recover, stage,
p2p, prune, re_execute, recover, stage,
};
use std::{fmt, sync::Arc};
@@ -62,6 +62,9 @@ pub enum Commands<Spec: ChainSpecParser = OpChainSpecParser, Ext: clap::Args + f
#[cfg(feature = "dev")]
#[command(name = "test-vectors")]
TestVectors(test_vectors::Command),
/// Re-execute blocks in parallel to verify historical sync correctness.
#[command(name = "re-execute")]
ReExecute(re_execute::Command<Spec>),
}
impl<
@@ -86,6 +89,7 @@ impl<
Self::ImportReceiptsOp(cmd) => cmd.chain_spec(),
#[cfg(feature = "dev")]
Self::TestVectors(_) => None,
Self::ReExecute(cmd) => cmd.chain_spec(),
}
}
}

View File

@@ -627,7 +627,8 @@ fn execution_checkpoint<N: NodePrimitives>(
})
}
fn calculate_gas_used_from_headers<N: NodePrimitives>(
/// Calculates the total amount of gas used from the headers in the given range.
pub fn calculate_gas_used_from_headers<N: NodePrimitives>(
provider: &StaticFileProvider<N>,
range: RangeInclusive<BlockNumber>,
) -> Result<u64, ProviderError> {

View File

@@ -46,3 +46,4 @@
- [`reth recover`](/cli/reth/recover)
- [`reth recover storage-tries`](/cli/reth/recover/storage-tries)
- [`reth prune`](/cli/reth/prune)
- [`reth re-execute`](/cli/reth/re-execute)

View File

@@ -23,6 +23,7 @@ Commands:
debug Various debug routines
recover Scripts for node recovery
prune Prune according to the configuration without any limits
re-execute Re-execute blocks in parallel to verify historical sync correctness
help Print this message or the help of the given subcommand(s)
Options:

View File

@@ -0,0 +1,159 @@
# reth re-execute
Re-execute blocks in parallel to verify historical sync correctness
```bash
$ reth re-execute --help
```
```txt
Usage: reth re-execute [OPTIONS]
Options:
-h, --help
Print help (see a summary with '-h')
Datadir:
--datadir <DATA_DIR>
The path to the data dir for all reth files and subdirectories.
Defaults to the OS-specific data directory:
- Linux: `$XDG_DATA_HOME/reth/` or `$HOME/.local/share/reth/`
- Windows: `{FOLDERID_RoamingAppData}/reth/`
- macOS: `$HOME/Library/Application Support/reth/`
[default: default]
--datadir.static-files <PATH>
The absolute path to store static files in.
--config <FILE>
The path to the configuration file to use
--chain <CHAIN_OR_PATH>
The chain this node is running.
Possible values are either a built-in chain or the path to a chain specification file.
Built-in chains:
mainnet, sepolia, holesky, hoodi, dev
[default: mainnet]
Database:
--db.log-level <LOG_LEVEL>
Database logging level. Levels higher than "notice" require a debug build
Possible values:
- fatal: Enables logging for critical conditions, i.e. assertion failures
- error: Enables logging for error conditions
- warn: Enables logging for warning conditions
- notice: Enables logging for normal but significant condition
- verbose: Enables logging for verbose informational
- debug: Enables logging for debug-level messages
- trace: Enables logging for trace debug-level messages
- extra: Enables logging for extra debug-level messages
--db.exclusive <EXCLUSIVE>
Open environment in exclusive/monopolistic mode. Makes it possible to open a database on an NFS volume
[possible values: true, false]
--db.max-size <MAX_SIZE>
Maximum database size (e.g., 4TB, 8MB)
--db.growth-step <GROWTH_STEP>
Database growth step (e.g., 4GB, 4KB)
--db.read-transaction-timeout <READ_TRANSACTION_TIMEOUT>
Read transaction timeout in seconds, 0 means no timeout
--from <FROM>
The height to start at
[default: 1]
--to <TO>
The height to end at. Defaults to the latest block
--num-tasks <NUM_TASKS>
Number of tasks to run in parallel
[default: 10]
Logging:
--log.stdout.format <FORMAT>
The format to use for logs written to stdout
[default: terminal]
Possible values:
- json: Represents JSON formatting for logs. This format outputs log records as JSON objects, making it suitable for structured logging
- log-fmt: Represents logfmt (key=value) formatting for logs. This format is concise and human-readable, typically used in command-line applications
- terminal: Represents terminal-friendly formatting for logs
--log.stdout.filter <FILTER>
The filter to use for logs written to stdout
[default: ]
--log.file.format <FORMAT>
The format to use for logs written to the log file
[default: terminal]
Possible values:
- json: Represents JSON formatting for logs. This format outputs log records as JSON objects, making it suitable for structured logging
- log-fmt: Represents logfmt (key=value) formatting for logs. This format is concise and human-readable, typically used in command-line applications
- terminal: Represents terminal-friendly formatting for logs
--log.file.filter <FILTER>
The filter to use for logs written to the log file
[default: debug]
--log.file.directory <PATH>
The path to put log files in
[default: <CACHE_DIR>/logs]
--log.file.max-size <SIZE>
The maximum size (in MB) of one log file
[default: 200]
--log.file.max-files <COUNT>
The maximum amount of log files that will be stored. If set to 0, background file logging is disabled
[default: 5]
--log.journald
Write logs to journald
--log.journald.filter <FILTER>
The filter to use for logs written to journald
[default: error]
--color <COLOR>
Sets whether or not the formatter emits ANSI terminal escape codes for colors and other text formatting
[default: always]
Possible values:
- always: Colors on
- auto: Colors on
- never: Colors off
Display:
-v, --verbosity...
Set the minimum log level.
-v Errors
-vv Warnings
-vvv Info
-vvvv Debug
-vvvvv Traces (warning: very verbose!)
-q, --quiet
Silence all log output
```