chore: replace test chain subs with blockchaintree (#2036)

This commit is contained in:
Matthias Seitz
2023-03-30 16:07:59 +02:00
committed by GitHub
parent 6261262a20
commit 78470f7bf7
2 changed files with 56 additions and 76 deletions

View File

@@ -12,7 +12,7 @@ use clap::{crate_version, Parser};
use events::NodeEvent;
use eyre::Context;
use fdlimit::raise_fd_limit;
use futures::{pin_mut, stream::select as stream_select, Stream, StreamExt};
use futures::{pin_mut, stream::select as stream_select, FutureExt, Stream, StreamExt};
use reth_beacon_consensus::{BeaconConsensus, BeaconConsensusEngine, BeaconEngineMessage};
use reth_db::{
database::Database,
@@ -26,10 +26,7 @@ use reth_downloaders::{
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
};
use reth_executor::{
blockchain_tree::{
config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree,
ShareableBlockchainTree,
},
blockchain_tree::{externals::TreeExternals, BlockchainTree, ShareableBlockchainTree},
Factory,
};
use reth_interfaces::{
@@ -42,7 +39,6 @@ use reth_interfaces::{
},
},
sync::SyncStateUpdater,
test_utils::TestChainEventSubscriptions,
};
use reth_network::{error::NetworkError, NetworkConfig, NetworkHandle, NetworkManager};
use reth_network_api::NetworkInfo;
@@ -70,7 +66,7 @@ use std::{
sync::Arc,
};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
mpsc::{unbounded_channel, UnboundedSender},
oneshot, watch,
};
use tracing::*;
@@ -169,19 +165,6 @@ impl Command {
info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");
debug!(target: "reth::cli", peer_id = ?network.peer_id(), "Full peer ID");
let _rpc_server = self
.rpc
.start_rpc_server(
shareable_db.clone(),
transaction_pool.clone(),
network.clone(),
ctx.task_executor.clone(),
// TODO use real implementation
TestChainEventSubscriptions::default(),
)
.await?;
info!(target: "reth::cli", "Started RPC server");
if self.debug.continuous {
info!(target: "reth::cli", "Continuous sync mode enabled");
}
@@ -215,23 +198,6 @@ impl Command {
}
};
let engine_api_handle =
self.init_engine_api(Arc::clone(&db), consensus_engine_tx.clone(), &ctx.task_executor);
info!(target: "reth::cli", "Engine API handler initialized");
let _auth_server = self
.rpc
.start_auth_server(
shareable_db.clone(),
transaction_pool.clone(),
network.clone(),
ctx.task_executor.clone(),
self.chain.clone(),
engine_api_handle,
)
.await?;
info!(target: "reth::cli", "Started Auth server");
let client = network.fetch_client().await?;
let (pipeline, events) = self
.build_networked_pipeline(
@@ -247,17 +213,64 @@ impl Command {
ctx.task_executor
.spawn_critical("events task", events::handle_events(Some(network.clone()), events));
let beacon_consensus_engine = self.build_consensus_engine(
// configure blockchain tree
let tree_externals = TreeExternals::new(
db.clone(),
&ctx.task_executor,
consensus,
Factory::new(self.chain.clone()),
Arc::clone(&self.chain),
);
let blockchain_tree =
ShareableBlockchainTree::new(BlockchainTree::new(tree_externals, Default::default())?);
let beacon_consensus_engine = BeaconConsensusEngine::new(
Arc::clone(&db),
ctx.task_executor.clone(),
pipeline,
blockchain_tree.clone(),
consensus_engine_rx,
)?;
self.debug.max_block,
);
info!(target: "reth::cli", "Consensus engine initialized");
// Run consensus engine
let (rx, tx) = tokio::sync::oneshot::channel();
let engine_api_handle =
self.init_engine_api(Arc::clone(&db), consensus_engine_tx.clone(), &ctx.task_executor);
info!(target: "reth::cli", "Engine API handler initialized");
let launch_rpc = self
.rpc
.start_rpc_server(
shareable_db.clone(),
transaction_pool.clone(),
network.clone(),
ctx.task_executor.clone(),
blockchain_tree,
)
.inspect(|_| {
info!(target: "reth::cli", "Started RPC server");
});
let launch_auth = self
.rpc
.start_auth_server(
shareable_db.clone(),
transaction_pool.clone(),
network.clone(),
ctx.task_executor.clone(),
self.chain.clone(),
engine_api_handle,
)
.inspect(|_| {
info!(target: "reth::cli", "Started Auth server");
});
// launch servers
let (_rpc_server, _auth_server) =
futures::future::try_join(launch_rpc, launch_auth).await?;
// Run consensus engine to completion
let (rx, tx) = oneshot::channel();
info!(target: "reth::cli", "Starting consensus engine");
ctx.task_executor.spawn_critical_blocking("consensus engine", async move {
let res = beacon_consensus_engine.await;
@@ -327,40 +340,6 @@ impl Command {
Ok((pipeline, events))
}
#[allow(clippy::type_complexity)]
fn build_consensus_engine<DB, U, C>(
&self,
db: Arc<DB>,
task_executor: &TaskExecutor,
consensus: C,
pipeline: Pipeline<DB, U>,
message_rx: UnboundedReceiver<BeaconEngineMessage>,
) -> eyre::Result<
BeaconConsensusEngine<DB, TaskExecutor, U, ShareableBlockchainTree<Arc<DB>, C, Factory>>,
>
where
DB: Database + Unpin + 'static,
U: SyncStateUpdater + Unpin + 'static,
C: Consensus + Unpin + 'static,
{
let executor_factory = Factory::new(self.chain.clone());
let tree_externals =
TreeExternals::new(db.clone(), consensus, executor_factory, self.chain.clone());
let blockchain_tree = ShareableBlockchainTree::new(BlockchainTree::new(
tree_externals,
BlockchainTreeConfig::default(),
)?);
Ok(BeaconConsensusEngine::new(
db,
task_executor.clone(),
pipeline,
blockchain_tree,
message_rx,
self.debug.max_block,
))
}
fn load_config(&self) -> eyre::Result<Config> {
confy::load_path::<Config>(&self.config).wrap_err_with(|| {
format!("Could not load config file {}", self.config.as_ref().display())

View File

@@ -17,6 +17,7 @@ use std::{
use super::BlockchainTree;
/// Shareable blockchain tree that is behind tokio::RwLock
#[derive(Clone)]
pub struct ShareableBlockchainTree<DB: Database, C: Consensus, EF: ExecutorFactory> {
/// BlockchainTree
pub tree: Arc<RwLock<BlockchainTree<DB, C, EF>>>,