From 78470f7bf7723aee69b3a6b6847d0b3f53bc7844 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Thu, 30 Mar 2023 16:07:59 +0200 Subject: [PATCH] chore: replace test chain subs with blockchaintree (#2036) --- bin/reth/src/node/mod.rs | 131 ++++++++---------- .../executor/src/blockchain_tree/shareable.rs | 1 + 2 files changed, 56 insertions(+), 76 deletions(-) diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 4b944f07f6..10a134879f 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -12,7 +12,7 @@ use clap::{crate_version, Parser}; use events::NodeEvent; use eyre::Context; use fdlimit::raise_fd_limit; -use futures::{pin_mut, stream::select as stream_select, Stream, StreamExt}; +use futures::{pin_mut, stream::select as stream_select, FutureExt, Stream, StreamExt}; use reth_beacon_consensus::{BeaconConsensus, BeaconConsensusEngine, BeaconEngineMessage}; use reth_db::{ database::Database, @@ -26,10 +26,7 @@ use reth_downloaders::{ headers::reverse_headers::ReverseHeadersDownloaderBuilder, }; use reth_executor::{ - blockchain_tree::{ - config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, - ShareableBlockchainTree, - }, + blockchain_tree::{externals::TreeExternals, BlockchainTree, ShareableBlockchainTree}, Factory, }; use reth_interfaces::{ @@ -42,7 +39,6 @@ use reth_interfaces::{ }, }, sync::SyncStateUpdater, - test_utils::TestChainEventSubscriptions, }; use reth_network::{error::NetworkError, NetworkConfig, NetworkHandle, NetworkManager}; use reth_network_api::NetworkInfo; @@ -70,7 +66,7 @@ use std::{ sync::Arc, }; use tokio::sync::{ - mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + mpsc::{unbounded_channel, UnboundedSender}, oneshot, watch, }; use tracing::*; @@ -169,19 +165,6 @@ impl Command { info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network"); debug!(target: "reth::cli", peer_id = ?network.peer_id(), "Full peer ID"); - let _rpc_server = self - .rpc - .start_rpc_server( - shareable_db.clone(), - transaction_pool.clone(), - network.clone(), - ctx.task_executor.clone(), - // TODO use real implementation - TestChainEventSubscriptions::default(), - ) - .await?; - info!(target: "reth::cli", "Started RPC server"); - if self.debug.continuous { info!(target: "reth::cli", "Continuous sync mode enabled"); } @@ -215,23 +198,6 @@ impl Command { } }; - let engine_api_handle = - self.init_engine_api(Arc::clone(&db), consensus_engine_tx.clone(), &ctx.task_executor); - info!(target: "reth::cli", "Engine API handler initialized"); - - let _auth_server = self - .rpc - .start_auth_server( - shareable_db.clone(), - transaction_pool.clone(), - network.clone(), - ctx.task_executor.clone(), - self.chain.clone(), - engine_api_handle, - ) - .await?; - info!(target: "reth::cli", "Started Auth server"); - let client = network.fetch_client().await?; let (pipeline, events) = self .build_networked_pipeline( @@ -247,17 +213,64 @@ impl Command { ctx.task_executor .spawn_critical("events task", events::handle_events(Some(network.clone()), events)); - let beacon_consensus_engine = self.build_consensus_engine( + // configure blockchain tree + let tree_externals = TreeExternals::new( db.clone(), - &ctx.task_executor, consensus, + Factory::new(self.chain.clone()), + Arc::clone(&self.chain), + ); + let blockchain_tree = + ShareableBlockchainTree::new(BlockchainTree::new(tree_externals, Default::default())?); + + let beacon_consensus_engine = BeaconConsensusEngine::new( + Arc::clone(&db), + ctx.task_executor.clone(), pipeline, + blockchain_tree.clone(), consensus_engine_rx, - )?; + self.debug.max_block, + ); + info!(target: "reth::cli", "Consensus engine initialized"); - // Run consensus engine - let (rx, tx) = tokio::sync::oneshot::channel(); + let engine_api_handle = + self.init_engine_api(Arc::clone(&db), consensus_engine_tx.clone(), &ctx.task_executor); + info!(target: "reth::cli", "Engine API handler initialized"); + + let launch_rpc = self + .rpc + .start_rpc_server( + shareable_db.clone(), + transaction_pool.clone(), + network.clone(), + ctx.task_executor.clone(), + blockchain_tree, + ) + .inspect(|_| { + info!(target: "reth::cli", "Started RPC server"); + }); + + let launch_auth = self + .rpc + .start_auth_server( + shareable_db.clone(), + transaction_pool.clone(), + network.clone(), + ctx.task_executor.clone(), + self.chain.clone(), + engine_api_handle, + ) + .inspect(|_| { + info!(target: "reth::cli", "Started Auth server"); + }); + + // launch servers + let (_rpc_server, _auth_server) = + futures::future::try_join(launch_rpc, launch_auth).await?; + + // Run consensus engine to completion + let (rx, tx) = oneshot::channel(); info!(target: "reth::cli", "Starting consensus engine"); ctx.task_executor.spawn_critical_blocking("consensus engine", async move { let res = beacon_consensus_engine.await; @@ -327,40 +340,6 @@ impl Command { Ok((pipeline, events)) } - #[allow(clippy::type_complexity)] - fn build_consensus_engine( - &self, - db: Arc, - task_executor: &TaskExecutor, - consensus: C, - pipeline: Pipeline, - message_rx: UnboundedReceiver, - ) -> eyre::Result< - BeaconConsensusEngine, C, Factory>>, - > - where - DB: Database + Unpin + 'static, - U: SyncStateUpdater + Unpin + 'static, - C: Consensus + Unpin + 'static, - { - let executor_factory = Factory::new(self.chain.clone()); - let tree_externals = - TreeExternals::new(db.clone(), consensus, executor_factory, self.chain.clone()); - let blockchain_tree = ShareableBlockchainTree::new(BlockchainTree::new( - tree_externals, - BlockchainTreeConfig::default(), - )?); - - Ok(BeaconConsensusEngine::new( - db, - task_executor.clone(), - pipeline, - blockchain_tree, - message_rx, - self.debug.max_block, - )) - } - fn load_config(&self) -> eyre::Result { confy::load_path::(&self.config).wrap_err_with(|| { format!("Could not load config file {}", self.config.as_ref().display()) diff --git a/crates/executor/src/blockchain_tree/shareable.rs b/crates/executor/src/blockchain_tree/shareable.rs index 7989df13cb..e69bd7a012 100644 --- a/crates/executor/src/blockchain_tree/shareable.rs +++ b/crates/executor/src/blockchain_tree/shareable.rs @@ -17,6 +17,7 @@ use std::{ use super::BlockchainTree; /// Shareable blockchain tree that is behind tokio::RwLock +#[derive(Clone)] pub struct ShareableBlockchainTree { /// BlockchainTree pub tree: Arc>>,