feat(sync): beacon consensus engine (#1845)

This commit is contained in:
Roman Krasiuk
2023-03-23 20:18:19 +02:00
committed by GitHub
parent 84af91737d
commit ce40bea46e
27 changed files with 1918 additions and 1141 deletions

View File

@@ -5,6 +5,7 @@ use clap::Args;
use jsonrpsee::{core::Error as RpcError, server::ServerHandle};
use reth_interfaces::events::ChainEventSubscriptions;
use reth_network_api::{NetworkInfo, Peers};
use reth_primitives::ChainSpec;
use reth_provider::{BlockProvider, EvmEnvProvider, HeaderProvider, StateProviderFactory};
use reth_rpc::{JwtError, JwtSecret};
use reth_rpc_builder::{
@@ -17,6 +18,7 @@ use reth_transaction_pool::TransactionPool;
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
path::Path,
sync::Arc,
};
/// Parameters for configuring the rpc more granularity via CLI
@@ -149,6 +151,7 @@ impl RpcServerArgs {
pool: Pool,
network: Network,
executor: Tasks,
chain_spec: Arc<ChainSpec>,
handle: EngineApiHandle,
) -> Result<ServerHandle, RpcError>
where
@@ -173,6 +176,7 @@ impl RpcServerArgs {
pool,
network,
executor,
chain_spec,
handle,
socket_address,
secret,

View File

@@ -13,7 +13,7 @@ use events::NodeEvent;
use eyre::Context;
use fdlimit::raise_fd_limit;
use futures::{pin_mut, stream::select as stream_select, Stream, StreamExt};
use reth_beacon_consensus::BeaconConsensus;
use reth_beacon_consensus::{BeaconConsensus, BeaconConsensusEngine, BeaconEngineMessage};
use reth_db::{
database::Database,
mdbx::{Env, WriteMap},
@@ -25,6 +25,10 @@ use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder,
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
};
use reth_executor::{
blockchain_tree::{config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree},
Factory,
};
use reth_interfaces::{
consensus::{Consensus, ForkchoiceState},
p2p::{
@@ -61,7 +65,10 @@ use std::{
path::PathBuf,
sync::Arc,
};
use tokio::sync::{mpsc::unbounded_channel, watch};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot, watch,
};
use tracing::*;
pub mod events;
@@ -219,9 +226,37 @@ impl Command {
info!(target: "reth::cli", "Continuous sync mode enabled");
}
// TODO: This will be fixed with the sync controller (https://github.com/paradigmxyz/reth/pull/1662)
let (tx, _rx) = watch::channel(ForkchoiceState::default());
let engine_api_handle = self.init_engine_api(Arc::clone(&db), tx, &ctx.task_executor);
let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
// Forward the `debug.tip` as forkchoice state to the consensus engine.
// This will initiate the sync up to the provided tip.
let _tip_rx = match self.tip {
Some(tip) => {
let (tip_tx, tip_rx) = oneshot::channel();
let state = ForkchoiceState {
head_block_hash: tip,
finalized_block_hash: tip,
safe_block_hash: tip,
};
consensus_engine_tx.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs: None,
tx: tip_tx,
})?;
debug!(target: "reth::cli", %tip, "Tip manually set");
Some(tip_rx)
}
None => {
let warn_msg = "No tip specified. \
reth cannot communicate with consensus clients, \
so a tip must manually be provided for the online stages with --debug.tip <HASH>.";
warn!(target: "reth::cli", warn_msg);
None
}
};
let engine_api_handle =
self.init_engine_api(Arc::clone(&db), consensus_engine_tx, &ctx.task_executor);
info!(target: "reth::cli", "Engine API handler initialized");
let _auth_server = self
@@ -231,12 +266,13 @@ impl Command {
transaction_pool,
network.clone(),
ctx.task_executor.clone(),
self.chain.clone(),
engine_api_handle,
)
.await?;
info!(target: "reth::cli", "Started Auth server");
let (mut pipeline, events) = self
let (pipeline, events) = self
.build_networked_pipeline(
&mut config,
network.clone(),
@@ -246,29 +282,22 @@ impl Command {
)
.await?;
if let Some(tip) = self.tip {
pipeline.set_tip(tip);
debug!(target: "reth::cli", %tip, "Tip manually set");
} else {
let warn_msg = "No tip specified. \
reth cannot communicate with consensus clients, \
so a tip must manually be provided for the online stages with --debug.tip <HASH>.";
warn!(target: "reth::cli", warn_msg);
}
ctx.task_executor.spawn(events::handle_events(Some(network.clone()), events));
// Run pipeline
let beacon_consensus_engine =
self.build_consensus_engine(db.clone(), consensus, pipeline, consensus_engine_rx)?;
// Run consensus engine
let (rx, tx) = tokio::sync::oneshot::channel();
info!(target: "reth::cli", "Starting sync pipeline");
ctx.task_executor.spawn_critical_blocking("pipeline task", async move {
let res = pipeline.run(db.clone()).await;
info!(target: "reth::cli", "Starting consensus engine");
ctx.task_executor.spawn_critical_blocking("consensus engine", async move {
let res = beacon_consensus_engine.await;
let _ = rx.send(res);
});
tx.await??;
info!(target: "reth::cli", "Pipeline has finished.");
info!(target: "reth::cli", "Consensus engine has exited.");
if self.terminate {
Ok(())
@@ -327,6 +356,26 @@ impl Command {
Ok((pipeline, events))
}
fn build_consensus_engine<DB, U, C>(
&self,
db: Arc<DB>,
consensus: C,
pipeline: Pipeline<DB, U>,
message_rx: UnboundedReceiver<BeaconEngineMessage>,
) -> eyre::Result<BeaconConsensusEngine<DB, U, C, Factory>>
where
DB: Database + Unpin + 'static,
U: SyncStateUpdater + Unpin + 'static,
C: Consensus + Unpin + 'static,
{
let executor_factory = Factory::new(self.chain.clone());
let tree_externals =
TreeExternals::new(db.clone(), consensus, executor_factory, self.chain.clone());
let blockchain_tree = BlockchainTree::new(tree_externals, BlockchainTreeConfig::default())?;
Ok(BeaconConsensusEngine::new(db, pipeline, blockchain_tree, message_rx, self.max_block))
}
fn load_config(&self) -> eyre::Result<Config> {
confy::load_path::<Config>(&self.config).wrap_err("Could not load config")
}
@@ -355,7 +404,7 @@ impl Command {
fn init_engine_api(
&self,
db: Arc<Env<WriteMap>>,
forkchoice_state_tx: watch::Sender<ForkchoiceState>,
engine_tx: UnboundedSender<BeaconEngineMessage>,
task_executor: &TaskExecutor,
) -> EngineApiHandle {
let (message_tx, message_rx) = unbounded_channel();
@@ -363,7 +412,7 @@ impl Command {
ShareableDatabase::new(db, self.chain.clone()),
self.chain.clone(),
message_rx,
forkchoice_state_tx,
engine_tx,
);
task_executor.spawn_critical("engine API task", engine_api);
message_tx