//! Support for customizing the node use crate::{ args::{ get_secret_key, DatabaseArgs, DebugArgs, DevArgs, DiscoveryArgs, NetworkArgs, PayloadBuilderArgs, PruningArgs, RpcServerArgs, TxPoolArgs, }, dirs::{ChainPath, DataDirPath}, metrics::prometheus_exporter, utils::get_single_header, }; use discv5::ListenConfig; use metrics_exporter_prometheus::PrometheusHandle; use once_cell::sync::Lazy; use reth_auto_seal_consensus::{AutoSealConsensus, MiningMode}; use reth_beacon_consensus::BeaconConsensus; use reth_config::{ config::{PruneConfig, StageConfig}, Config, }; use reth_db::{database::Database, database_metrics::DatabaseMetrics}; use reth_downloaders::{ bodies::bodies::BodiesDownloaderBuilder, headers::reverse_headers::ReverseHeadersDownloaderBuilder, }; use reth_interfaces::{ consensus::Consensus, p2p::{ bodies::{client::BodiesClient, downloader::BodyDownloader}, headers::{client::HeadersClient, downloader::HeaderDownloader}, }, RethResult, }; use reth_network::{NetworkBuilder, NetworkConfig, NetworkManager}; use reth_node_api::ConfigureEvm; use reth_primitives::{ constants::eip4844::MAINNET_KZG_TRUSTED_SETUP, kzg::KzgSettings, stage::StageId, BlockHashOrNumber, BlockNumber, ChainSpec, Head, SealedHeader, TxHash, B256, MAINNET, }; use reth_provider::{ providers::StaticFileProvider, BlockHashReader, BlockNumReader, HeaderProvider, HeaderSyncMode, ProviderFactory, StageCheckpointReader, }; use reth_revm::stack::{Hook, InspectorStackConfig}; use reth_stages::{ prelude::*, stages::{ AccountHashingStage, ExecutionStage, ExecutionStageThresholds, IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage, TransactionLookupStage, }, }; use reth_static_file::StaticFileProducer; use reth_tasks::TaskExecutor; use secp256k1::SecretKey; use std::{net::SocketAddr, path::PathBuf, sync::Arc}; use tokio::sync::{mpsc::Receiver, watch}; use tracing::*; /// The default prometheus recorder handle. We use a global static to ensure that it is only /// installed once. pub static PROMETHEUS_RECORDER_HANDLE: Lazy = Lazy::new(|| prometheus_exporter::install_recorder().unwrap()); /// This includes all necessary configuration to launch the node. /// The individual configuration options can be overwritten before launching the node. /// /// # Example /// ```rust /// # use reth_tasks::{TaskManager, TaskSpawner}; /// # use reth_node_core::{ /// # node_config::NodeConfig, /// # args::RpcServerArgs, /// # }; /// # use reth_rpc_builder::RpcModuleSelection; /// # use tokio::runtime::Handle; /// /// async fn t() { /// let handle = Handle::current(); /// let manager = TaskManager::new(handle); /// let executor = manager.executor(); /// /// // create the builder /// let builder = NodeConfig::default(); /// /// // configure the rpc apis /// let mut rpc = RpcServerArgs::default().with_http().with_ws(); /// rpc.http_api = Some(RpcModuleSelection::All); /// let builder = builder.with_rpc(rpc); /// } /// ``` /// /// This can also be used to launch a node with a temporary test database. This can be done with /// the [NodeConfig::test] method. /// /// # Example /// ```rust /// # use reth_tasks::{TaskManager, TaskSpawner}; /// # use reth_node_core::{ /// # node_config::NodeConfig, /// # args::RpcServerArgs, /// # }; /// # use reth_rpc_builder::RpcModuleSelection; /// # use tokio::runtime::Handle; /// /// async fn t() { /// let handle = Handle::current(); /// let manager = TaskManager::new(handle); /// let executor = manager.executor(); /// /// // create the builder with a test database, using the `test` method /// let builder = NodeConfig::test(); /// /// // configure the rpc apis /// let mut rpc = RpcServerArgs::default().with_http().with_ws(); /// rpc.http_api = Some(RpcModuleSelection::All); /// let builder = builder.with_rpc(rpc); /// } /// ``` #[derive(Debug, Clone)] pub struct NodeConfig { /// The path to the configuration file to use. pub config: Option, /// The chain this node is running. /// /// Possible values are either a built-in chain or the path to a chain specification file. pub chain: Arc, /// Enable Prometheus metrics. /// /// The metrics will be served at the given interface and port. pub metrics: Option, /// Add a new instance of a node. /// /// Configures the ports of the node to avoid conflicts with the defaults. /// This is useful for running multiple nodes on the same machine. /// /// Max number of instances is 200. It is chosen in a way so that it's not possible to have /// port numbers that conflict with each other. /// /// Changes to the following port numbers: /// - DISCOVERY_PORT: default + `instance` - 1 /// - DISCOVERY_V5_PORT: default + `instance` - 1 /// - AUTH_PORT: default + `instance` * 100 - 100 /// - HTTP_RPC_PORT: default - `instance` + 1 /// - WS_RPC_PORT: default + `instance` * 2 - 2 pub instance: u16, /// All networking related arguments pub network: NetworkArgs, /// All rpc related arguments pub rpc: RpcServerArgs, /// All txpool related arguments with --txpool prefix pub txpool: TxPoolArgs, /// All payload builder related arguments pub builder: PayloadBuilderArgs, /// All debug related arguments with --debug prefix pub debug: DebugArgs, /// All database related arguments pub db: DatabaseArgs, /// All dev related arguments with --dev prefix pub dev: DevArgs, /// All pruning related arguments pub pruning: PruningArgs, } impl NodeConfig { /// Creates a testing [NodeConfig], causing the database to be launched ephemerally. pub fn test() -> Self { Self::default() // set all ports to zero by default for test instances .with_unused_ports() } /// Sets --dev mode for the node pub const fn dev(mut self) -> Self { self.dev.dev = true; self } /// Set the config file for the node pub fn with_config(mut self, config: impl Into) -> Self { self.config = Some(config.into()); self } /// Set the [ChainSpec] for the node pub fn with_chain(mut self, chain: impl Into>) -> Self { self.chain = chain.into(); self } /// Set the metrics address for the node pub fn with_metrics(mut self, metrics: SocketAddr) -> Self { self.metrics = Some(metrics); self } /// Set the instance for the node pub fn with_instance(mut self, instance: u16) -> Self { self.instance = instance; self } /// Set the network args for the node pub fn with_network(mut self, network: NetworkArgs) -> Self { self.network = network; self } /// Set the rpc args for the node pub fn with_rpc(mut self, rpc: RpcServerArgs) -> Self { self.rpc = rpc; self } /// Set the txpool args for the node pub fn with_txpool(mut self, txpool: TxPoolArgs) -> Self { self.txpool = txpool; self } /// Set the builder args for the node pub fn with_payload_builder(mut self, builder: PayloadBuilderArgs) -> Self { self.builder = builder; self } /// Set the debug args for the node pub fn with_debug(mut self, debug: DebugArgs) -> Self { self.debug = debug; self } /// Set the database args for the node pub fn with_db(mut self, db: DatabaseArgs) -> Self { self.db = db; self } /// Set the dev args for the node pub fn with_dev(mut self, dev: DevArgs) -> Self { self.dev = dev; self } /// Set the pruning args for the node pub fn with_pruning(mut self, pruning: PruningArgs) -> Self { self.pruning = pruning; self } /// Get the network secret from the given data dir pub fn network_secret(&self, data_dir: &ChainPath) -> eyre::Result { let network_secret_path = self.network.p2p_secret_key.clone().unwrap_or_else(|| data_dir.p2p_secret_path()); debug!(target: "reth::cli", ?network_secret_path, "Loading p2p key file"); let secret_key = get_secret_key(&network_secret_path)?; Ok(secret_key) } /// Returns the initial pipeline target, based on whether or not the node is running in /// `debug.tip` mode, `debug.continuous` mode, or neither. /// /// If running in `debug.tip` mode, the configured tip is returned. /// Otherwise, if running in `debug.continuous` mode, the genesis hash is returned. /// Otherwise, `None` is returned. This is what the node will do by default. pub fn initial_pipeline_target(&self, genesis_hash: B256) -> Option { if let Some(tip) = self.debug.tip { // Set the provided tip as the initial pipeline target. debug!(target: "reth::cli", %tip, "Tip manually set"); Some(tip) } else if self.debug.continuous { // Set genesis as the initial pipeline target. // This will allow the downloader to start debug!(target: "reth::cli", "Continuous sync mode enabled"); Some(genesis_hash) } else { None } } /// Returns pruning configuration. pub fn prune_config(&self) -> eyre::Result> { self.pruning.prune_config(Arc::clone(&self.chain)) } /// Returns the max block that the node should run to, looking it up from the network if /// necessary pub async fn max_block( &self, network_client: &Client, provider: Provider, ) -> eyre::Result> where Provider: HeaderProvider, Client: HeadersClient, { let max_block = if let Some(block) = self.debug.max_block { Some(block) } else if let Some(tip) = self.debug.tip { Some(self.lookup_or_fetch_tip(provider, network_client, tip).await?) } else { None }; Ok(max_block) } /// Get the [MiningMode] from the given dev args pub fn mining_mode(&self, pending_transactions_listener: Receiver) -> MiningMode { if let Some(interval) = self.dev.block_time { MiningMode::interval(interval) } else if let Some(max_transactions) = self.dev.block_max_transactions { MiningMode::instant(max_transactions, pending_transactions_listener) } else { info!(target: "reth::cli", "No mining mode specified, defaulting to ReadyTransaction"); MiningMode::instant(1, pending_transactions_listener) } } /// Create the [NetworkConfig] for the node pub fn network_config( &self, config: &Config, client: C, executor: TaskExecutor, head: Head, data_dir: &ChainPath, ) -> eyre::Result> { info!(target: "reth::cli", "Connecting to P2P network"); let secret_key = self.network_secret(data_dir)?; let default_peers_path = data_dir.known_peers_path(); Ok(self.load_network_config(config, client, executor, head, secret_key, default_peers_path)) } /// Create the [NetworkBuilder]. /// /// This only configures it and does not spawn it. pub async fn build_network( &self, config: &Config, client: C, executor: TaskExecutor, head: Head, data_dir: &ChainPath, ) -> eyre::Result> where C: BlockNumReader, { let network_config = self.network_config(config, client, executor, head, data_dir)?; let builder = NetworkManager::builder(network_config).await?; Ok(builder) } /// Returns the [Consensus] instance to use. /// /// By default this will be a [BeaconConsensus] instance, but if the `--dev` flag is set, it /// will be an [AutoSealConsensus] instance. pub fn consensus(&self) -> Arc { if self.dev.dev { Arc::new(AutoSealConsensus::new(Arc::clone(&self.chain))) } else { Arc::new(BeaconConsensus::new(Arc::clone(&self.chain))) } } /// Constructs a [Pipeline] that's wired to the network #[allow(clippy::too_many_arguments)] pub async fn build_networked_pipeline( &self, config: &StageConfig, client: Client, consensus: Arc, provider_factory: ProviderFactory, task_executor: &TaskExecutor, metrics_tx: reth_stages::MetricEventsSender, prune_config: Option, max_block: Option, static_file_producer: StaticFileProducer, evm_config: EvmConfig, ) -> eyre::Result> where DB: Database + Unpin + Clone + 'static, Client: HeadersClient + BodiesClient + Clone + 'static, EvmConfig: ConfigureEvm + Clone + 'static, { // building network downloaders using the fetch client let header_downloader = ReverseHeadersDownloaderBuilder::new(config.headers) .build(client.clone(), Arc::clone(&consensus)) .into_task_with(task_executor); let body_downloader = BodiesDownloaderBuilder::new(config.bodies) .build(client, Arc::clone(&consensus), provider_factory.clone()) .into_task_with(task_executor); let pipeline = self .build_pipeline( provider_factory, config, header_downloader, body_downloader, consensus, max_block, self.debug.continuous, metrics_tx, prune_config, static_file_producer, evm_config, ) .await?; Ok(pipeline) } /// Loads 'MAINNET_KZG_TRUSTED_SETUP' pub fn kzg_settings(&self) -> eyre::Result> { Ok(Arc::clone(&MAINNET_KZG_TRUSTED_SETUP)) } /// Installs the prometheus recorder. pub fn install_prometheus_recorder(&self) -> eyre::Result { Ok(PROMETHEUS_RECORDER_HANDLE.clone()) } /// Serves the prometheus endpoint over HTTP with the given database and prometheus handle. pub async fn start_metrics_endpoint( &self, prometheus_handle: PrometheusHandle, db: Metrics, static_file_provider: StaticFileProvider, ) -> eyre::Result<()> where Metrics: DatabaseMetrics + 'static + Send + Sync, { if let Some(listen_addr) = self.metrics { info!(target: "reth::cli", addr = %listen_addr, "Starting metrics endpoint"); prometheus_exporter::serve( listen_addr, prometheus_handle, db, static_file_provider, metrics_process::Collector::default(), ) .await?; } Ok(()) } /// Fetches the head block from the database. /// /// If the database is empty, returns the genesis block. pub fn lookup_head(&self, factory: ProviderFactory) -> RethResult { let provider = factory.provider()?; let head = provider.get_stage_checkpoint(StageId::Finish)?.unwrap_or_default().block_number; let header = provider .header_by_number(head)? .expect("the header for the latest block is missing, database is corrupt"); let total_difficulty = provider .header_td_by_number(head)? .expect("the total difficulty for the latest block is missing, database is corrupt"); let hash = provider .block_hash(head)? .expect("the hash for the latest block is missing, database is corrupt"); Ok(Head { number: head, hash, difficulty: header.difficulty, total_difficulty, timestamp: header.timestamp, }) } /// Attempt to look up the block number for the tip hash in the database. /// If it doesn't exist, download the header and return the block number. /// /// NOTE: The download is attempted with infinite retries. pub async fn lookup_or_fetch_tip( &self, provider: Provider, client: Client, tip: B256, ) -> RethResult where Provider: HeaderProvider, Client: HeadersClient, { let header = provider.header_by_hash_or_number(tip.into())?; // try to look up the header in the database if let Some(header) = header { info!(target: "reth::cli", ?tip, "Successfully looked up tip block in the database"); return Ok(header.number); } Ok(self.fetch_tip_from_network(client, tip.into()).await?.number) } /// Attempt to look up the block with the given number and return the header. /// /// NOTE: The download is attempted with infinite retries. pub async fn fetch_tip_from_network( &self, client: Client, tip: BlockHashOrNumber, ) -> RethResult where Client: HeadersClient, { info!(target: "reth::cli", ?tip, "Fetching tip block from the network."); loop { match get_single_header(&client, tip).await { Ok(tip_header) => { info!(target: "reth::cli", ?tip, "Successfully fetched tip"); return Ok(tip_header); } Err(error) => { error!(target: "reth::cli", %error, "Failed to fetch the tip. Retrying..."); } } } } /// Builds the [NetworkConfig] with the given [ProviderFactory]. pub fn load_network_config( &self, config: &Config, client: C, executor: TaskExecutor, head: Head, secret_key: SecretKey, default_peers_path: PathBuf, ) -> NetworkConfig { let cfg_builder = self .network .network_config(config, self.chain.clone(), secret_key, default_peers_path) .with_task_executor(Box::new(executor)) .set_head(head) .listener_addr(SocketAddr::new( self.network.addr, // set discovery port based on instance number self.network.port + self.instance - 1, )) .discovery_addr(SocketAddr::new( self.network.discovery.addr, // set discovery port based on instance number self.network.discovery.port + self.instance - 1, )); let config = cfg_builder.build(client); if !self.network.discovery.enable_discv5_discovery { return config } // work around since discv5 config builder can't be integrated into network config builder // due to unsatisfied trait bounds config.discovery_v5_with_config_builder(|builder| { let DiscoveryArgs { discv5_addr, discv5_port, .. } = self.network.discovery; builder .discv5_config( discv5::ConfigBuilder::new(ListenConfig::from(Into::::into(( discv5_addr, discv5_port + self.instance - 1, )))) .build(), ) .build() }) } /// Builds the [Pipeline] with the given [ProviderFactory] and downloaders. #[allow(clippy::too_many_arguments)] pub async fn build_pipeline( &self, provider_factory: ProviderFactory, stage_config: &StageConfig, header_downloader: H, body_downloader: B, consensus: Arc, max_block: Option, continuous: bool, metrics_tx: reth_stages::MetricEventsSender, prune_config: Option, static_file_producer: StaticFileProducer, evm_config: EvmConfig, ) -> eyre::Result> where DB: Database + Clone + 'static, H: HeaderDownloader + 'static, B: BodyDownloader + 'static, EvmConfig: ConfigureEvm + Clone + 'static, { let mut builder = Pipeline::builder(); if let Some(max_block) = max_block { debug!(target: "reth::cli", max_block, "Configuring builder to use max block"); builder = builder.with_max_block(max_block) } let (tip_tx, tip_rx) = watch::channel(B256::ZERO); let factory = reth_revm::EvmProcessorFactory::new(self.chain.clone(), evm_config); let stack_config = InspectorStackConfig { use_printer_tracer: self.debug.print_inspector, hook: if let Some(hook_block) = self.debug.hook_block { Hook::Block(hook_block) } else if let Some(tx) = self.debug.hook_transaction { Hook::Transaction(tx) } else if self.debug.hook_all { Hook::All } else { Hook::None }, }; let factory = factory.with_stack_config(stack_config); let prune_modes = prune_config.map(|prune| prune.segments).unwrap_or_default(); let header_mode = if continuous { HeaderSyncMode::Continuous } else { HeaderSyncMode::Tip(tip_rx) }; let pipeline = builder .with_tip_sender(tip_tx) .with_metrics_tx(metrics_tx.clone()) .add_stages( DefaultStages::new( provider_factory.clone(), header_mode, Arc::clone(&consensus), header_downloader, body_downloader, factory.clone(), stage_config.etl.clone(), ) .set(SenderRecoveryStage { commit_threshold: stage_config.sender_recovery.commit_threshold, }) .set( ExecutionStage::new( factory, ExecutionStageThresholds { max_blocks: stage_config.execution.max_blocks, max_changes: stage_config.execution.max_changes, max_cumulative_gas: stage_config.execution.max_cumulative_gas, max_duration: stage_config.execution.max_duration, }, stage_config .merkle .clean_threshold .max(stage_config.account_hashing.clean_threshold) .max(stage_config.storage_hashing.clean_threshold), prune_modes.clone(), ) .with_metrics_tx(metrics_tx), ) .set(AccountHashingStage::new( stage_config.account_hashing.clean_threshold, stage_config.account_hashing.commit_threshold, stage_config.etl.clone(), )) .set(StorageHashingStage::new( stage_config.storage_hashing.clean_threshold, stage_config.storage_hashing.commit_threshold, stage_config.etl.clone(), )) .set(MerkleStage::new_execution(stage_config.merkle.clean_threshold)) .set(TransactionLookupStage::new( stage_config.transaction_lookup.chunk_size, stage_config.etl.clone(), prune_modes.transaction_lookup, )) .set(IndexAccountHistoryStage::new( stage_config.index_account_history.commit_threshold, prune_modes.account_history, stage_config.etl.clone(), )) .set(IndexStorageHistoryStage::new( stage_config.index_storage_history.commit_threshold, prune_modes.storage_history, stage_config.etl.clone(), )), ) .build(provider_factory, static_file_producer); Ok(pipeline) } /// Change rpc port numbers based on the instance number, using the inner /// [RpcServerArgs::adjust_instance_ports] method. pub fn adjust_instance_ports(&mut self) { self.rpc.adjust_instance_ports(self.instance); } /// Sets networking and RPC ports to zero, causing the OS to choose random unused ports when /// sockets are bound. pub fn with_unused_ports(mut self) -> Self { self.rpc = self.rpc.with_unused_ports(); self.network = self.network.with_unused_ports(); self } } impl Default for NodeConfig { fn default() -> Self { Self { config: None, chain: MAINNET.clone(), metrics: None, instance: 1, network: NetworkArgs::default(), rpc: RpcServerArgs::default(), txpool: TxPoolArgs::default(), builder: PayloadBuilderArgs::default(), debug: DebugArgs::default(), db: DatabaseArgs::default(), dev: DevArgs::default(), pruning: PruningArgs::default(), } } }