feat: Stage Tool Execution Stage (#723)

* feat(stage-tool): exec stage

* fix(stage-tool): commit for the full range

* feat: skip unwinding
This commit is contained in:
Georgios Konstantopoulos
2023-01-04 23:40:24 +02:00
committed by GitHub
parent 5c88f25ef3
commit e069248e78
2 changed files with 32 additions and 12 deletions

View File

@@ -38,7 +38,12 @@ pub enum Commands {
/// Database debugging utilities
#[command(name = "db")]
Db(db::Command),
/// Run a single stage
/// Run a single stage.
///
/// Note that this won't use the Pipeline and as a result runs stages
/// assuming that all the data can be held in memory. It is not recommended
/// to run a stage for really large block ranges if your computer does not have
/// a lot of memory to store all the data.
#[command(name = "stage")]
Stage(stage::Command),
/// P2P Debugging utilities

View File

@@ -12,10 +12,11 @@ use crate::{
};
use reth_consensus::BeaconConsensus;
use reth_downloaders::bodies::concurrent::ConcurrentDownloader;
use reth_executor::Config as ExecutorConfig;
use reth_primitives::NodeRecord;
use reth_stages::{
metrics::HeaderMetrics,
stages::{bodies::BodyStage, sender_recovery::SenderRecoveryStage},
stages::{bodies::BodyStage, execution::ExecutionStage, sender_recovery::SenderRecoveryStage},
ExecInput, Stage, StageId, Transaction, UnwindInput,
};
@@ -66,7 +67,6 @@ pub struct Command {
metrics: Option<SocketAddr>,
/// The name of the stage to run
#[arg(long, short)]
stage: StageEnum,
/// The height to start at
@@ -77,9 +77,13 @@ pub struct Command {
#[arg(long, short)]
to: u64,
/// Whether to unwind or run the stage forward
/// Normally, running the stage requires unwinding for stages that already
/// have been run, in order to not rewrite to the same database slots.
///
/// You can optionally skip the unwinding phase if you're syncing a block
/// range that has not been synced before.
#[arg(long, short)]
unwind: bool,
skip_unwind: bool,
#[clap(flatten)]
network: NetworkOpts,
@@ -139,6 +143,8 @@ impl Command {
let db = Arc::new(init_db(&self.db)?);
let mut tx = Transaction::new(db.as_ref())?;
let num_blocks = self.to - self.from + 1;
match self.stage {
StageEnum::Bodies => {
let chain_id = self.chain.consensus.chain_id;
@@ -164,7 +170,6 @@ impl Command {
.await?;
let fetch_client = Arc::new(network.fetch_client().await?);
dbg!(&config.stages.bodies);
let mut stage = BodyStage {
downloader: Arc::new(
ConcurrentDownloader::new(fetch_client.clone(), consensus.clone())
@@ -173,25 +178,35 @@ impl Command {
.with_concurrency(config.stages.bodies.downloader_concurrency),
),
consensus: consensus.clone(),
commit_threshold: config.stages.bodies.commit_threshold,
commit_threshold: num_blocks,
};
// Unwind first
stage.unwind(&mut tx, unwind).await?;
if !self.skip_unwind {
stage.unwind(&mut tx, unwind).await?;
}
stage.execute(&mut tx, input).await?;
}
StageEnum::Senders => {
let mut stage = SenderRecoveryStage {
batch_size: config.stages.sender_recovery.batch_size,
commit_threshold: self.to - self.from + 1,
commit_threshold: num_blocks,
};
// Unwind first
stage.unwind(&mut tx, unwind).await?;
if !self.skip_unwind {
stage.unwind(&mut tx, unwind).await?;
}
stage.execute(&mut tx, input).await?;
}
StageEnum::Execution => {
// let stage = ExecutionStage { config: ExecutorConfig::new_ethereum() };
let mut stage = ExecutionStage {
config: ExecutorConfig::new_ethereum(),
commit_threshold: num_blocks,
};
if !self.skip_unwind {
stage.unwind(&mut tx, unwind).await?;
}
stage.execute(&mut tx, input).await?;
}
_ => {}
}