From 759ba39311a3968e7d09cf19a6f7e344362f755f Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Wed, 4 Jan 2023 21:53:27 +0200 Subject: [PATCH] feat: Stage tool (#716) * feat: `reth stage` command * refactor(bin): move init utils to separate file * feat(bin): scaffold stage command for one stage * fix: correctly set from/to for stage range * fix(stage-tool): add unwind before execute to re-exec otherwise we're double executing stuff * fix(stage-tool): use max commit threshold avail * chore: rm unused vars * fix(genesis-init): take a write tx only if needed this avoids blocking by accident if we took a write tx expecting that init_genesis would immediately return the hash * feat(stage-tool): add bodies stage Co-authored-by: Oliver Nordbjerg --- Cargo.lock | 1 + bin/reth/Cargo.toml | 1 + bin/reth/src/cli.rs | 15 +-- bin/reth/src/lib.rs | 1 + bin/reth/src/node/mod.rs | 64 ++---------- bin/reth/src/stage/mod.rs | 201 ++++++++++++++++++++++++++++++++++++++ bin/reth/src/util/init.rs | 61 ++++++++++++ bin/reth/src/util/mod.rs | 3 + 8 files changed, 283 insertions(+), 64 deletions(-) create mode 100644 bin/reth/src/stage/mod.rs create mode 100644 bin/reth/src/util/init.rs diff --git a/Cargo.lock b/Cargo.lock index cda3bb62f3..89310ceb63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3495,6 +3495,7 @@ dependencies = [ "serde", "serde_json", "shellexpand", + "strum", "tempfile", "thiserror", "tokio", diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index b50d075d93..6b12565567 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -47,5 +47,6 @@ clap = { version = "4.0", features = ["derive", "cargo"] } thiserror = "1.0" tokio = { version = "1.21", features = ["sync", "macros", "rt-multi-thread"] } futures = "0.3.25" +strum = "0.24.1" tempfile = { version = "3.3.0" } backon = "0.2.0" diff --git a/bin/reth/src/cli.rs b/bin/reth/src/cli.rs index ec210bf83d..2733873828 100644 --- a/bin/reth/src/cli.rs +++ b/bin/reth/src/cli.rs @@ -1,12 +1,11 @@ //! CLI definition and entrypoint to executable -use clap::{ArgAction, Parser, Subcommand}; -use tracing_subscriber::util::SubscriberInitExt; - use crate::{ - db, node, p2p, test_eth_chain, + db, node, p2p, stage, test_eth_chain, util::reth_tracing::{self, TracingMode}, }; +use clap::{ArgAction, Parser, Subcommand}; +use tracing_subscriber::util::SubscriberInitExt; /// main function that parses cli and runs command pub async fn run() -> eyre::Result<()> { @@ -22,6 +21,7 @@ pub async fn run() -> eyre::Result<()> { Commands::Node(command) => command.execute().await, Commands::TestEthChain(command) => command.execute().await, Commands::Db(command) => command.execute().await, + Commands::Stage(command) => command.execute().await, Commands::P2P(command) => command.execute().await, } } @@ -32,12 +32,15 @@ pub enum Commands { /// Start the node #[command(name = "node")] Node(node::Command), - /// Runs Ethereum blockchain tests + /// Run Ethereum blockchain tests #[command(name = "test-chain")] TestEthChain(test_eth_chain::Command), - /// DB Debugging utilities + /// Database debugging utilities #[command(name = "db")] Db(db::Command), + /// Run a single stage + #[command(name = "stage")] + Stage(stage::Command), /// P2P Debugging utilities #[command(name = "p2p")] P2P(p2p::Command), diff --git a/bin/reth/src/lib.rs b/bin/reth/src/lib.rs index 3261009fed..ed456fa193 100644 --- a/bin/reth/src/lib.rs +++ b/bin/reth/src/lib.rs @@ -13,5 +13,6 @@ pub mod dirs; pub mod node; pub mod p2p; pub mod prometheus_exporter; +pub mod stage; pub mod test_eth_chain; pub mod util; diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 515bb82c59..7b3d8eefb8 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -5,22 +5,18 @@ use crate::{ config::Config, dirs::{ConfigPath, DbPath}, prometheus_exporter, - util::chainspec::{chain_spec_value_parser, ChainSpecification, Genesis}, + util::{ + chainspec::{chain_spec_value_parser, ChainSpecification}, + init::{init_db, init_genesis}, + }, }; use clap::{crate_version, Parser}; use fdlimit::raise_fd_limit; use reth_consensus::BeaconConsensus; -use reth_db::{ - cursor::DbCursorRO, - database::Database, - mdbx::{Env, WriteMap}, - tables, - transaction::{DbTx, DbTxMut}, -}; use reth_downloaders::{bodies, headers}; use reth_executor::Config as ExecutorConfig; use reth_interfaces::consensus::ForkchoiceState; -use reth_primitives::{Account, Header, H256}; +use reth_primitives::H256; use reth_stages::{ metrics::HeaderMetrics, stages::{ @@ -28,7 +24,7 @@ use reth_stages::{ sender_recovery::SenderRecoveryStage, total_difficulty::TotalDifficultyStage, }, }; -use std::{net::SocketAddr, path::Path, sync::Arc}; +use std::{net::SocketAddr, sync::Arc}; use tracing::{debug, info}; /// Start the client @@ -173,51 +169,3 @@ impl Command { Ok(()) } } - -/// Opens up an existing database or creates a new one at the specified path. -fn init_db>(path: P) -> eyre::Result> { - std::fs::create_dir_all(path.as_ref())?; - let db = reth_db::mdbx::Env::::open( - path.as_ref(), - reth_db::mdbx::EnvKind::RW, - )?; - db.create_tables()?; - - Ok(db) -} - -/// Write the genesis block if it has not already been written -#[allow(clippy::field_reassign_with_default)] -fn init_genesis(db: Arc, genesis: Genesis) -> Result { - let tx = db.tx_mut()?; - if let Some((_, hash)) = tx.cursor::()?.first()? { - debug!("Genesis already written, skipping."); - return Ok(hash) - } - debug!("Writing genesis block."); - - // Insert account state - for (address, account) in &genesis.alloc { - tx.put::( - *address, - Account { - nonce: account.nonce.unwrap_or_default(), - balance: account.balance, - bytecode_hash: None, - }, - )?; - } - - // Insert header - let header: Header = genesis.into(); - let hash = header.hash_slow(); - tx.put::(0, hash)?; - tx.put::(hash, 0)?; - tx.put::((0, hash).into(), Default::default())?; - tx.put::((0, hash).into(), 0)?; - tx.put::((0, hash).into(), header.difficulty.into())?; - tx.put::((0, hash).into(), header)?; - - tx.commit()?; - Ok(hash) -} diff --git a/bin/reth/src/stage/mod.rs b/bin/reth/src/stage/mod.rs new file mode 100644 index 0000000000..a035638cd5 --- /dev/null +++ b/bin/reth/src/stage/mod.rs @@ -0,0 +1,201 @@ +//! Main `stage` command +//! +//! Stage debugging tool +use crate::{ + config::Config, + dirs::{ConfigPath, DbPath}, + prometheus_exporter, + util::{ + chainspec::{chain_spec_value_parser, ChainSpecification}, + init::{init_db, init_genesis}, + }, +}; +use reth_consensus::BeaconConsensus; +use reth_downloaders::bodies::concurrent::ConcurrentDownloader; +use reth_primitives::NodeRecord; +use reth_stages::{ + metrics::HeaderMetrics, + stages::{bodies::BodyStage, sender_recovery::SenderRecoveryStage}, + ExecInput, Stage, StageId, Transaction, UnwindInput, +}; + +use clap::Parser; +use serde::Deserialize; +use std::{net::SocketAddr, sync::Arc}; +use strum::{AsRefStr, EnumString, EnumVariantNames}; +use tracing::*; + +/// `reth stage` command +#[derive(Debug, Parser)] +pub struct Command { + /// The path to the database folder. + /// + /// Defaults to the OS-specific data directory: + /// + /// - Linux: `$XDG_DATA_HOME/reth/db` or `$HOME/.local/share/reth/db` + /// - Windows: `{FOLDERID_RoamingAppData}/reth/db` + /// - macOS: `$HOME/Library/Application Support/reth/db` + #[arg(long, value_name = "PATH", verbatim_doc_comment, default_value_t)] + db: DbPath, + + /// The path to the configuration file to use. + #[arg(long, value_name = "FILE", verbatim_doc_comment, default_value_t)] + config: ConfigPath, + + /// The chain this node is running. + /// + /// Possible values are either a built-in chain or the path to a chain specification file. + /// + /// Built-in chains: + /// - mainnet + /// - goerli + /// - sepolia + #[arg( + long, + value_name = "CHAIN_OR_PATH", + verbatim_doc_comment, + default_value = "mainnet", + value_parser = chain_spec_value_parser + )] + chain: ChainSpecification, + + /// Enable Prometheus metrics. + /// + /// The metrics will be served at the given interface and port. + #[clap(long, value_name = "SOCKET")] + metrics: Option, + + /// The name of the stage to run + #[arg(long, short)] + stage: StageEnum, + + /// The height to start at + #[arg(long)] + from: u64, + + /// The end of the stage + #[arg(long, short)] + to: u64, + + /// Whether to unwind or run the stage forward + #[arg(long, short)] + unwind: bool, + + #[clap(flatten)] + network: NetworkOpts, +} + +#[derive( + Debug, Clone, Copy, Eq, PartialEq, AsRefStr, EnumVariantNames, EnumString, Deserialize, +)] +#[serde(rename_all = "snake_case")] +#[strum(serialize_all = "kebab-case")] +enum StageEnum { + Headers, + Bodies, + Senders, + Execution, +} + +#[derive(Debug, Parser)] +struct NetworkOpts { + /// Disable the discovery service. + #[arg(short, long)] + disable_discovery: bool, + + /// Target trusted peer enodes + /// --trusted-peers enode://abcd@192.168.0.1:30303 + #[arg(long)] + trusted_peers: Vec, + + /// Connect only to trusted peers + #[arg(long)] + trusted_only: bool, +} + +impl Command { + /// Execute `stage` command + pub async fn execute(&self) -> eyre::Result<()> { + // Raise the fd limit of the process. + // Does not do anything on windows. + fdlimit::raise_fd_limit(); + + if let Some(listen_addr) = self.metrics { + info!("Starting metrics endpoint at {}", listen_addr); + prometheus_exporter::initialize(listen_addr)?; + HeaderMetrics::describe(); + } + + let config: Config = confy::load_path(&self.config).unwrap_or_default(); + info!("reth {} starting stage {:?}", clap::crate_version!(), self.stage); + + let input = ExecInput { + previous_stage: Some((StageId("No Previous Stage"), self.to)), + stage_progress: Some(self.from), + }; + + let unwind = UnwindInput { stage_progress: self.to, unwind_to: self.from, bad_block: None }; + + let db = Arc::new(init_db(&self.db)?); + let mut tx = Transaction::new(db.as_ref())?; + + match self.stage { + StageEnum::Bodies => { + let chain_id = self.chain.consensus.chain_id; + let consensus = Arc::new(BeaconConsensus::new(self.chain.consensus.clone())); + let genesis_hash = init_genesis(db.clone(), self.chain.genesis.clone())?; + + let mut config = config; + config.peers.connect_trusted_nodes_only = self.network.trusted_only; + if !self.network.trusted_peers.is_empty() { + self.network.trusted_peers.iter().for_each(|peer| { + config.peers.trusted_nodes.insert(*peer); + }); + } + + let network = config + .network_config( + db.clone(), + chain_id, + genesis_hash, + self.network.disable_discovery, + ) + .start_network() + .await?; + let fetch_client = Arc::new(network.fetch_client().await?); + + dbg!(&config.stages.bodies); + let mut stage = BodyStage { + downloader: Arc::new( + ConcurrentDownloader::new(fetch_client.clone(), consensus.clone()) + .with_batch_size(config.stages.bodies.downloader_batch_size) + .with_retries(config.stages.bodies.downloader_retries) + .with_concurrency(config.stages.bodies.downloader_concurrency), + ), + consensus: consensus.clone(), + commit_threshold: config.stages.bodies.commit_threshold, + }; + + // Unwind first + stage.unwind(&mut tx, unwind).await?; + stage.execute(&mut tx, input).await?; + } + StageEnum::Senders => { + let mut stage = SenderRecoveryStage { + batch_size: config.stages.sender_recovery.batch_size, + commit_threshold: self.to - self.from + 1, + }; + + // Unwind first + stage.unwind(&mut tx, unwind).await?; + stage.execute(&mut tx, input).await?; + } + StageEnum::Execution => { + // let stage = ExecutionStage { config: ExecutorConfig::new_ethereum() }; + } + _ => {} + } + + Ok(()) + } +} diff --git a/bin/reth/src/util/init.rs b/bin/reth/src/util/init.rs new file mode 100644 index 0000000000..7bbfa5f4dd --- /dev/null +++ b/bin/reth/src/util/init.rs @@ -0,0 +1,61 @@ +use crate::util::chainspec::Genesis; +use reth_db::{ + cursor::DbCursorRO, + database::Database, + mdbx::{Env, WriteMap}, + tables, + transaction::{DbTx, DbTxMut}, +}; +use reth_primitives::{Account, Header, H256}; +use std::{path::Path, sync::Arc}; +use tracing::debug; + +/// Opens up an existing database or creates a new one at the specified path. +pub fn init_db>(path: P) -> eyre::Result> { + std::fs::create_dir_all(path.as_ref())?; + let db = reth_db::mdbx::Env::::open( + path.as_ref(), + reth_db::mdbx::EnvKind::RW, + )?; + db.create_tables()?; + + Ok(db) +} + +/// Write the genesis block if it has not already been written +#[allow(clippy::field_reassign_with_default)] +pub fn init_genesis(db: Arc, genesis: Genesis) -> Result { + let tx = db.tx()?; + if let Some((_, hash)) = tx.cursor::()?.first()? { + debug!("Genesis already written, skipping."); + return Ok(hash) + } + drop(tx); + debug!("Writing genesis block."); + let tx = db.tx_mut()?; + + // Insert account state + for (address, account) in &genesis.alloc { + tx.put::( + *address, + Account { + nonce: account.nonce.unwrap_or_default(), + balance: account.balance, + bytecode_hash: None, + }, + )?; + } + + // Insert header + let header: Header = genesis.into(); + let hash = header.hash_slow(); + tx.put::(0, hash)?; + tx.put::(hash, 0)?; + tx.put::((0, hash).into(), Default::default())?; + tx.put::((0, hash).into(), 0)?; + tx.put::((0, hash).into(), header.difficulty.into())?; + tx.put::((0, hash).into(), header)?; + + tx.commit()?; + Ok(hash) +} diff --git a/bin/reth/src/util/mod.rs b/bin/reth/src/util/mod.rs index 28cfb73a86..72a3847c9c 100644 --- a/bin/reth/src/util/mod.rs +++ b/bin/reth/src/util/mod.rs @@ -10,6 +10,9 @@ use walkdir::{DirEntry, WalkDir}; /// Utilities for parsing chainspecs pub mod chainspec; +/// Utilities for initializing parts of the chain +pub mod init; + /// Finds all files in a directory with a given postfix. pub(crate) fn find_all_files_with_postfix(path: &Path, postfix: &str) -> Vec { WalkDir::new(path)