diff --git a/bin/reth/src/cli.rs b/bin/reth/src/cli.rs index 2733873828..0102e90bcd 100644 --- a/bin/reth/src/cli.rs +++ b/bin/reth/src/cli.rs @@ -38,7 +38,12 @@ pub enum Commands { /// Database debugging utilities #[command(name = "db")] Db(db::Command), - /// Run a single stage + /// Run a single stage. + /// + /// Note that this won't use the Pipeline and as a result runs stages + /// assuming that all the data can be held in memory. It is not recommended + /// to run a stage for really large block ranges if your computer does not have + /// a lot of memory to store all the data. #[command(name = "stage")] Stage(stage::Command), /// P2P Debugging utilities diff --git a/bin/reth/src/stage/mod.rs b/bin/reth/src/stage/mod.rs index a035638cd5..691ef6dbc4 100644 --- a/bin/reth/src/stage/mod.rs +++ b/bin/reth/src/stage/mod.rs @@ -12,10 +12,11 @@ use crate::{ }; use reth_consensus::BeaconConsensus; use reth_downloaders::bodies::concurrent::ConcurrentDownloader; +use reth_executor::Config as ExecutorConfig; use reth_primitives::NodeRecord; use reth_stages::{ metrics::HeaderMetrics, - stages::{bodies::BodyStage, sender_recovery::SenderRecoveryStage}, + stages::{bodies::BodyStage, execution::ExecutionStage, sender_recovery::SenderRecoveryStage}, ExecInput, Stage, StageId, Transaction, UnwindInput, }; @@ -66,7 +67,6 @@ pub struct Command { metrics: Option, /// The name of the stage to run - #[arg(long, short)] stage: StageEnum, /// The height to start at @@ -77,9 +77,13 @@ pub struct Command { #[arg(long, short)] to: u64, - /// Whether to unwind or run the stage forward + /// Normally, running the stage requires unwinding for stages that already + /// have been run, in order to not rewrite to the same database slots. + /// + /// You can optionally skip the unwinding phase if you're syncing a block + /// range that has not been synced before. #[arg(long, short)] - unwind: bool, + skip_unwind: bool, #[clap(flatten)] network: NetworkOpts, @@ -139,6 +143,8 @@ impl Command { let db = Arc::new(init_db(&self.db)?); let mut tx = Transaction::new(db.as_ref())?; + let num_blocks = self.to - self.from + 1; + match self.stage { StageEnum::Bodies => { let chain_id = self.chain.consensus.chain_id; @@ -164,7 +170,6 @@ impl Command { .await?; let fetch_client = Arc::new(network.fetch_client().await?); - dbg!(&config.stages.bodies); let mut stage = BodyStage { downloader: Arc::new( ConcurrentDownloader::new(fetch_client.clone(), consensus.clone()) @@ -173,25 +178,35 @@ impl Command { .with_concurrency(config.stages.bodies.downloader_concurrency), ), consensus: consensus.clone(), - commit_threshold: config.stages.bodies.commit_threshold, + commit_threshold: num_blocks, }; - // Unwind first - stage.unwind(&mut tx, unwind).await?; + if !self.skip_unwind { + stage.unwind(&mut tx, unwind).await?; + } stage.execute(&mut tx, input).await?; } StageEnum::Senders => { let mut stage = SenderRecoveryStage { batch_size: config.stages.sender_recovery.batch_size, - commit_threshold: self.to - self.from + 1, + commit_threshold: num_blocks, }; // Unwind first - stage.unwind(&mut tx, unwind).await?; + if !self.skip_unwind { + stage.unwind(&mut tx, unwind).await?; + } stage.execute(&mut tx, input).await?; } StageEnum::Execution => { - // let stage = ExecutionStage { config: ExecutorConfig::new_ethereum() }; + let mut stage = ExecutionStage { + config: ExecutorConfig::new_ethereum(), + commit_threshold: num_blocks, + }; + if !self.skip_unwind { + stage.unwind(&mut tx, unwind).await?; + } + stage.execute(&mut tx, input).await?; } _ => {} }