diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index ec1160920c..d55b120087 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -12,12 +12,14 @@ use eyre::Context; use fdlimit::raise_fd_limit; use futures::{stream::select as stream_select, Stream, StreamExt}; use reth_consensus::BeaconConsensus; +use reth_db::mdbx::{Env, WriteMap}; use reth_downloaders::{bodies, headers}; use reth_interfaces::consensus::{Consensus, ForkchoiceState}; use reth_net_nat::NatResolver; -use reth_network::NetworkEvent; +use reth_network::{FetchClient, NetworkConfig, NetworkEvent, NetworkHandle}; use reth_network_api::NetworkInfo; use reth_primitives::{BlockNumber, ChainSpec, H256}; +use reth_provider::ShareableDatabase; use reth_staged_sync::{utils::init::init_genesis, Config}; use reth_stages::{ prelude::*, @@ -84,109 +86,34 @@ impl Command { /// Execute `node` command // TODO: RPC pub async fn execute(self) -> eyre::Result<()> { + info!(target: "reth::cli", "reth {} starting", crate_version!()); + // Raise the fd limit of the process. // Does not do anything on windows. raise_fd_limit(); - let mut config: Config = - confy::load_path(&self.config).wrap_err("Could not load config")?; - config.peers.connect_trusted_nodes_only = self.network.trusted_only; + let mut config: Config = self.load_config()?; + info!(target: "reth::cli", path = %self.db, "Configuration loaded"); - if !self.network.trusted_peers.is_empty() { - self.network.trusted_peers.iter().for_each(|peer| { - config.peers.trusted_nodes.insert(*peer); - }); - } - - info!(target: "reth::cli", "reth {} starting", crate_version!()); + self.init_trusted_nodes(&mut config); info!(target: "reth::cli", path = %self.db, "Opening database"); let db = Arc::new(init_db(&self.db)?); info!(target: "reth::cli", "Database opened"); - if let Some(listen_addr) = self.metrics { - info!(target: "reth::cli", addr = %listen_addr, "Starting metrics endpoint"); - prometheus_exporter::initialize(listen_addr)?; - } + self.start_metrics_endpoint()?; - let genesis = init_genesis(db.clone(), self.chain.clone())?; - info!(target: "reth::cli", ?genesis, "Inserted genesis"); + init_genesis(db.clone(), self.chain.clone())?; - // TODO: This should be in a builder/factory in the consensus crate - let consensus: Arc = { - let beacon_consensus = BeaconConsensus::new(self.chain.clone()); - - if let Some(tip) = self.tip { - debug!(target: "reth::cli", %tip, "Tip manually set"); - beacon_consensus.notify_fork_choice_state(ForkchoiceState { - head_block_hash: tip, - safe_block_hash: tip, - finalized_block_hash: tip, - })?; - } else { - warn!(target: "reth::cli", "No tip specified. reth cannot communicate with consensus clients, so a tip must manually be provided for the online stages with --debug.tip ."); - } - - Arc::new(beacon_consensus) - }; - - let network = config - .network_config( - db.clone(), - self.chain.clone(), - self.network.disable_discovery, - self.network.bootnodes.clone(), - self.nat, - ) - .start_network() - .await?; + let consensus = self.init_consensus()?; + info!(target: "reth::cli", "Consensus engine initialized"); + info!(target: "reth::cli", "Connecting to P2P network"); + let netconf = self.load_network_config(&config, &db); + let network = netconf.start_network().await?; info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network"); - let fetch_client = Arc::new(network.fetch_client().await?); - - // Spawn headers downloader - let header_downloader = headers::task::TaskDownloader::spawn( - headers::linear::LinearDownloadBuilder::default() - .request_limit(config.stages.headers.downloader_batch_size) - .stream_batch_size(config.stages.headers.commit_threshold as usize) - .build(consensus.clone(), fetch_client.clone()), - ); - - // Spawn bodies downloader - let body_downloader = bodies::task::TaskDownloader::spawn( - bodies::concurrent::ConcurrentDownloaderBuilder::default() - .with_stream_batch_size(config.stages.bodies.downloader_stream_batch_size) - .with_request_limit(config.stages.bodies.downloader_request_limit) - .with_max_buffered_responses(config.stages.bodies.downloader_max_buffered_responses) - .with_concurrent_requests_range( - config.stages.bodies.downloader_min_concurrent_requests..= - config.stages.bodies.downloader_max_concurrent_requests, - ) - .build(fetch_client.clone(), consensus.clone(), db.clone()), - ); - - let mut pipeline = Pipeline::builder() - .with_sync_state_updater(network.clone()) - .add_stages( - OnlineStages::new(consensus.clone(), header_downloader, body_downloader).set( - TotalDifficultyStage { - commit_threshold: config.stages.total_difficulty.commit_threshold, - }, - ), - ) - .add_stages( - OfflineStages::default() - .set(SenderRecoveryStage { - batch_size: config.stages.sender_recovery.batch_size, - commit_threshold: config.stages.execution.commit_threshold, - }) - .set(ExecutionStage { - chain_spec: self.chain, - commit_threshold: config.stages.execution.commit_threshold, - }), - ) - .build(); + let mut pipeline = self.build_pipeline(&config, &network, &consensus, &db).await?; tokio::spawn(handle_events(stream_select( network.event_listener().map(Into::into), @@ -200,6 +127,143 @@ impl Command { info!(target: "reth::cli", "Finishing up"); Ok(()) } + + fn load_config(&self) -> eyre::Result { + confy::load_path::(&self.config).wrap_err("Could not load config") + } + + fn init_trusted_nodes(&self, config: &mut Config) { + config.peers.connect_trusted_nodes_only = self.network.trusted_only; + + if !self.network.trusted_peers.is_empty() { + info!(target: "reth::cli", "Adding trusted nodes"); + self.network.trusted_peers.iter().for_each(|peer| { + config.peers.trusted_nodes.insert(*peer); + }); + } + } + + fn start_metrics_endpoint(&self) -> eyre::Result<()> { + if let Some(listen_addr) = self.metrics { + info!(target: "reth::cli", addr = %listen_addr, "Starting metrics endpoint"); + prometheus_exporter::initialize(listen_addr) + } else { + Ok(()) + } + } + + fn init_consensus(&self) -> eyre::Result> { + // TODO: This should be in a builder/factory in the consensus crate + let consensus: Arc = { + let beacon_consensus = BeaconConsensus::new(self.chain.clone()); + + if let Some(tip) = self.tip { + debug!(target: "reth::cli", %tip, "Tip manually set"); + beacon_consensus.notify_fork_choice_state(ForkchoiceState { + head_block_hash: tip, + safe_block_hash: tip, + finalized_block_hash: tip, + })?; + } else { + let warn_msg = "No tip specified. \ + reth cannot communicate with consensus clients, \ + so a tip must manually be provided for the online stages with --debug.tip ."; + warn!(target: "reth::cli", warn_msg); + } + + Arc::new(beacon_consensus) + }; + + Ok(consensus) + } + + fn load_network_config( + &self, + config: &Config, + db: &Arc>, + ) -> NetworkConfig>> { + config.network_config( + db.clone(), + self.chain.clone(), + self.network.disable_discovery, + self.network.bootnodes.clone(), + self.nat, + ) + } + + async fn build_pipeline( + &self, + config: &Config, + network: &NetworkHandle, + consensus: &Arc, + db: &Arc>, + ) -> eyre::Result, NetworkHandle>> { + let fetch_client = Arc::new(network.fetch_client().await?); + + let header_downloader = self.spawn_headers_downloader(config, consensus, &fetch_client); + let body_downloader = self.spawn_bodies_downloader(config, consensus, &fetch_client, db); + let stage_conf = &config.stages; + + let pipeline = Pipeline::builder() + .with_sync_state_updater(network.clone()) + .add_stages( + OnlineStages::new(consensus.clone(), header_downloader, body_downloader).set( + TotalDifficultyStage { + commit_threshold: stage_conf.total_difficulty.commit_threshold, + }, + ), + ) + .add_stages( + OfflineStages::default() + .set(SenderRecoveryStage { + batch_size: stage_conf.sender_recovery.batch_size, + commit_threshold: stage_conf.execution.commit_threshold, + }) + .set(ExecutionStage { + chain_spec: self.chain.clone(), + commit_threshold: stage_conf.execution.commit_threshold, + }), + ) + .build(); + + Ok(pipeline) + } + + fn spawn_headers_downloader( + &self, + config: &Config, + consensus: &Arc, + fetch_client: &Arc, + ) -> reth_downloaders::headers::task::TaskDownloader { + let headers_conf = &config.stages.headers; + headers::task::TaskDownloader::spawn( + headers::linear::LinearDownloadBuilder::default() + .request_limit(headers_conf.downloader_batch_size) + .stream_batch_size(headers_conf.commit_threshold as usize) + .build(consensus.clone(), fetch_client.clone()), + ) + } + + fn spawn_bodies_downloader( + &self, + config: &Config, + consensus: &Arc, + fetch_client: &Arc, + db: &Arc>, + ) -> reth_downloaders::bodies::task::TaskDownloader { + let bodies_conf = &config.stages.bodies; + bodies::task::TaskDownloader::spawn( + bodies::concurrent::ConcurrentDownloaderBuilder::default() + .with_stream_batch_size(bodies_conf.downloader_stream_batch_size) + .with_request_limit(bodies_conf.downloader_request_limit) + .with_max_buffered_responses(bodies_conf.downloader_max_buffered_responses) + .with_concurrent_requests_range( + bodies_conf.downloader_min_concurrent_requests..= + bodies_conf.downloader_max_concurrent_requests, + ) + .build(fetch_client.clone(), consensus.clone(), db.clone()), + ) + } } /// The current high-level state of the node.