feat: integrate builder (#6611)

This commit is contained in:
Matthias Seitz
2024-02-29 17:50:04 +01:00
committed by GitHub
parent 7d36206dfe
commit c5955f1305
73 changed files with 2201 additions and 3022 deletions

View File

@@ -4,32 +4,33 @@
use crate::{
components::{
FullNodeComponents, FullNodeComponentsAdapter, NodeComponents, NodeComponentsBuilder,
ComponentsBuilder, FullNodeComponents, FullNodeComponentsAdapter, NodeComponents,
NodeComponentsBuilder, PoolBuilder,
},
hooks::NodeHooks,
node::{FullNode, FullNodeTypes, FullNodeTypesAdapter, NodeTypes},
rpc::{RethRpcServerHandles, RpcContext, RpcHooks},
NodeHandle,
Node, NodeHandle,
};
use eyre::Context;
use futures::{future::Either, stream, stream_select, StreamExt};
use reth_beacon_consensus::{
hooks::{EngineHooks, PruneHook},
hooks::{EngineHooks, PruneHook, StaticFileHook},
BeaconConsensusEngine,
};
use reth_blockchain_tree::{BlockchainTreeConfig, ShareableBlockchainTree};
use reth_db::{
database::Database,
database_metrics::{DatabaseMetadata, DatabaseMetrics},
test_utils::{create_test_rw_db, TempDatabase},
DatabaseEnv,
};
use reth_interfaces::p2p::either::EitherDownloader;
use reth_network::{
transactions::{TransactionFetcherConfig, TransactionsManagerConfig},
NetworkBuilder, NetworkEvents, NetworkHandle,
};
use reth_network::{NetworkBuilder, NetworkConfig, NetworkEvents, NetworkHandle};
use reth_node_core::{
cli::config::{PayloadBuilderConfig, RethRpcConfig, RethTransactionPoolConfig},
dirs::{ChainPath, DataDirPath},
dirs::{ChainPath, DataDirPath, MaybePlatformPath},
engine_api_store::EngineApiStore,
events::cl::ConsensusLayerHealthEvents,
exit::NodeExitFuture,
init::init_genesis,
@@ -42,13 +43,14 @@ use reth_primitives::{
format_ether, ChainSpec,
};
use reth_provider::{providers::BlockchainProvider, ChainSpecProvider, ProviderFactory};
use reth_prune::{PrunerBuilder, PrunerEvent};
use reth_prune::PrunerBuilder;
use reth_revm::EvmProcessorFactory;
use reth_rpc_engine_api::EngineApi;
use reth_static_file::StaticFileProducer;
use reth_tasks::TaskExecutor;
use reth_tracing::tracing::{debug, info};
use reth_transaction_pool::{PoolConfig, TransactionPool};
use std::sync::Arc;
use std::{str::FromStr, sync::Arc};
use tokio::sync::{mpsc::unbounded_channel, oneshot};
/// The builtin provider type of the reth node.
@@ -56,6 +58,9 @@ use tokio::sync::{mpsc::unbounded_channel, oneshot};
type RethFullProviderType<DB, Evm> =
BlockchainProvider<DB, ShareableBlockchainTree<DB, EvmProcessorFactory<Evm>>>;
type RethFullAdapter<DB, N> =
FullNodeTypesAdapter<N, DB, RethFullProviderType<DB, <N as NodeTypes>::Evm>>;
/// Declaratively construct a node.
///
/// [`NodeBuilder`] provides a [builder-like interface][builder] for composing
@@ -69,11 +74,19 @@ type RethFullProviderType<DB, Evm> =
/// [ConfigureEvm](reth_node_api::evm::ConfigureEvm), the database [Database] and finally all the
/// components of the node that are downstream of those types, these include:
///
/// - The transaction pool: [PoolBuilder](crate::components::PoolBuilder)
/// - The transaction pool: [PoolBuilder]
/// - The network: [NetworkBuilder](crate::components::NetworkBuilder)
/// - The payload builder: [PayloadBuilder](crate::components::PayloadServiceBuilder)
///
/// Finally, the node is ready to launch [NodeBuilder::launch]
/// Once all the components are configured, the node is ready to be launched.
///
/// On launch the builder returns a fully type aware [NodeHandle] that has access to all the
/// configured components and can interact with the node.
///
/// There are convenience functions for networks that come with a preset of types and components via
/// the [Node] trait, see `reth_node_ethereum::EthereumNode` or `reth_node_optimism::OptimismNode`.
///
/// The [NodeBuilder::node] function configures the node's types and components in one step.
///
/// [builder]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html
pub struct NodeBuilder<DB, State> {
@@ -122,15 +135,40 @@ impl NodeBuilder<(), InitState> {
}
impl<DB> NodeBuilder<DB, InitState> {
/// Configures the additional external context, e.g. additional context captured via CLI args.
/// Configures the underlying database that the node will use.
pub fn with_database<D>(self, database: D) -> NodeBuilder<D, InitState> {
NodeBuilder { config: self.config, state: self.state, database }
}
/// Preconfigure the builder with the context to launch the node.
///
/// This provides the task executor and the data directory for the node.
pub fn with_launch_context(
self,
task_executor: TaskExecutor,
data_dir: ChainPath<DataDirPath>,
) -> WithLaunchContext<DB, InitState> {
WithLaunchContext { builder: self, task_executor, data_dir }
}
/// Creates an _ephemeral_ preconfigured node for testing purposes.
pub fn testing_node(
self,
task_executor: TaskExecutor,
) -> WithLaunchContext<Arc<TempDatabase<DatabaseEnv>>, InitState> {
let db = create_test_rw_db();
let db_path_str = db.path().to_str().expect("Path is not valid unicode");
let path =
MaybePlatformPath::<DataDirPath>::from_str(db_path_str).expect("Path is not valid");
let data_dir = path.unwrap_or_chain_default(self.config.chain.chain);
WithLaunchContext { builder: self.with_database(db), task_executor, data_dir }
}
}
impl<DB> NodeBuilder<DB, InitState>
where
DB: Database + Clone + 'static,
DB: Database + Unpin + Clone + 'static,
{
/// Configures the types of the node.
pub fn with_types<T>(self, types: T) -> NodeBuilder<DB, TypesState<T, DB>>
@@ -143,6 +181,43 @@ where
database: self.database,
}
}
/// Preconfigures the node with a specific node implementation.
///
/// This is a convenience method that sets the node's types and components in one call.
pub fn node<N>(
self,
node: N,
) -> NodeBuilder<
DB,
ComponentsState<
N,
ComponentsBuilder<
RethFullAdapter<DB, N>,
N::PoolBuilder,
N::PayloadBuilder,
N::NetworkBuilder,
>,
FullNodeComponentsAdapter<
RethFullAdapter<DB, N>,
<N::PoolBuilder as PoolBuilder<RethFullAdapter<DB, N>>>::Pool,
>,
>,
>
where
N: Node<FullNodeTypesAdapter<N, DB, RethFullProviderType<DB, <N as NodeTypes>::Evm>>>,
N::PoolBuilder: PoolBuilder<RethFullAdapter<DB, N>>,
N::NetworkBuilder: crate::components::NetworkBuilder<
RethFullAdapter<DB, N>,
<N::PoolBuilder as PoolBuilder<RethFullAdapter<DB, N>>>::Pool,
>,
N::PayloadBuilder: crate::components::PayloadServiceBuilder<
RethFullAdapter<DB, N>,
<N::PoolBuilder as PoolBuilder<RethFullAdapter<DB, N>>>::Pool,
>,
{
self.with_types(node.clone()).with_components(node.components())
}
}
impl<DB, Types> NodeBuilder<DB, TypesState<Types, DB>>
@@ -216,40 +291,6 @@ where
}
}
/// Resets the setup process to the components stage.
///
/// CAUTION: All previously configured hooks will be lost.
pub fn fuse_components<C>(
self,
components_builder: C,
) -> NodeBuilder<
DB,
ComponentsState<
Types,
C,
FullNodeComponentsAdapter<
FullNodeTypesAdapter<Types, DB, RethFullProviderType<DB, Types::Evm>>,
C::Pool,
>,
>,
>
where
C: NodeComponentsBuilder<
FullNodeTypesAdapter<Types, DB, RethFullProviderType<DB, Types::Evm>>,
>,
{
NodeBuilder {
config: self.config,
database: self.database,
state: ComponentsState {
types: self.state.types,
components_builder,
hooks: NodeHooks::new(),
rpc: RpcHooks::new(),
},
}
}
/// Sets the hook that is run once the node's components are initialized.
pub fn on_component_initialized<F>(mut self, hook: F) -> Self
where
@@ -259,6 +300,7 @@ where
Components::Pool,
>,
) -> eyre::Result<()>
+ Send
+ 'static,
{
self.state.hooks.set_on_component_initialized(hook);
@@ -276,6 +318,7 @@ where
>,
>,
) -> eyre::Result<()>
+ Send
+ 'static,
{
self.state.hooks.set_on_node_started(hook);
@@ -295,6 +338,7 @@ where
>,
RethRpcServerHandles,
) -> eyre::Result<()>
+ Send
+ 'static,
{
self.state.rpc.set_on_rpc_started(hook);
@@ -313,6 +357,7 @@ where
>,
>,
) -> eyre::Result<()>
+ Send
+ 'static,
{
self.state.rpc.set_extend_rpc_modules(hook);
@@ -320,6 +365,11 @@ where
}
/// Launches the node and returns a handle to it.
///
/// This bootstraps the node internals, creates all the components with the provider
/// [NodeComponentsBuilder] and launches the node.
///
/// Returns a [NodeHandle] that can be used to interact with the node.
pub async fn launch(
self,
executor: TaskExecutor,
@@ -332,7 +382,7 @@ where
>,
>,
> {
// get config
// get config from file
let reth_config = self.load_config(&data_dir)?;
let Self {
@@ -354,20 +404,14 @@ where
database.clone(),
Arc::clone(&config.chain),
data_dir.static_files_path(),
)?;
// configure static_file_producer
let static_file_producer = reth_static_file::StaticFileProducer::new(
provider_factory.clone(),
provider_factory.static_file_provider(),
config.prune_config()?.unwrap_or_default().segments,
);
)?
.with_static_files_metrics();
debug!(target: "reth::cli", chain=%config.chain.chain, genesis=?config.chain.genesis_hash(), "Initializing genesis");
let genesis_hash = init_genesis(provider_factory.clone())?;
info!(target: "reth::cli", "{}", config.chain.display_hardforks());
info!(target: "reth::cli", "{}",config.chain.display_hardforks());
let consensus = config.consensus();
@@ -412,7 +456,7 @@ where
debug!(target: "reth::cli", "creating components");
let NodeComponents { transaction_pool, network, payload_builder } =
components_builder.build_components(&ctx)?;
components_builder.build_components(&ctx).await?;
let BuilderContext {
provider: blockchain_db,
@@ -438,8 +482,30 @@ where
// create pipeline
let network_client = network.fetch_client().await?;
let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
let (consensus_engine_tx, mut consensus_engine_rx) = unbounded_channel();
if let Some(store_path) = config.debug.engine_api_store.clone() {
debug!(target: "reth::cli", "spawning engine API store");
let (engine_intercept_tx, engine_intercept_rx) = unbounded_channel();
let engine_api_store = EngineApiStore::new(store_path);
executor.spawn_critical(
"engine api interceptor",
engine_api_store.intercept(consensus_engine_rx, engine_intercept_tx),
);
consensus_engine_rx = engine_intercept_rx;
};
let max_block = config.max_block(&network_client, provider_factory.clone()).await?;
let mut hooks = EngineHooks::new();
let mut static_file_producer = StaticFileProducer::new(
provider_factory.clone(),
provider_factory.static_file_provider(),
prune_config.clone().unwrap_or_default().segments,
);
let static_file_producer_events = static_file_producer.events();
hooks.add(StaticFileHook::new(static_file_producer.clone(), Box::new(executor.clone())));
info!(target: "reth::cli", "StaticFileProducer initialized");
// Configure the pipeline
let (mut pipeline, client) = if config.dev.dev {
@@ -504,22 +570,16 @@ where
let pipeline_events = pipeline.events();
let initial_target = config.initial_pipeline_target(genesis_hash);
let mut hooks = EngineHooks::new();
let pruner_events = if let Some(prune_config) = prune_config {
let mut pruner = PrunerBuilder::new(prune_config.clone())
.max_reorg_depth(tree_config.max_reorg_depth() as usize)
.prune_delete_limit(config.chain.prune_delete_limit)
.build(provider_factory);
let prune_config = prune_config.unwrap_or_default();
let mut pruner = PrunerBuilder::new(prune_config.clone())
.max_reorg_depth(tree_config.max_reorg_depth() as usize)
.prune_delete_limit(config.chain.prune_delete_limit)
.build(provider_factory.clone());
let events = pruner.events();
hooks.add(PruneHook::new(pruner, Box::new(executor.clone())));
info!(target: "reth::cli", ?prune_config, "Pruner initialized");
Either::Left(events)
} else {
Either::Right(stream::empty::<PrunerEvent>())
};
let pruner_events = pruner.events();
hooks.add(PruneHook::new(pruner, Box::new(executor.clone())));
info!(target: "reth::cli", ?prune_config, "Pruner initialized");
// Configure the consensus engine
let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel(
@@ -551,7 +611,8 @@ where
} else {
Either::Right(stream::empty())
},
pruner_events.map(Into::into)
pruner_events.map(Into::into),
static_file_producer_events.map(Into::into)
);
executor.spawn_critical(
"events task",
@@ -613,7 +674,7 @@ where
network,
provider,
payload_builder,
executor,
task_executor: executor,
rpc_server_handles,
rpc_registry,
config,
@@ -638,6 +699,283 @@ where
}
}
/// A [NodeBuilder] with it's launch context already configured.
///
/// This exposes the same methods as [NodeBuilder] but with the launch context already configured,
/// See [WithLaunchContext::launch]
pub struct WithLaunchContext<DB, State> {
builder: NodeBuilder<DB, State>,
task_executor: TaskExecutor,
data_dir: ChainPath<DataDirPath>,
}
impl<DB, State> WithLaunchContext<DB, State> {
/// Returns a reference to the node builder's config.
pub fn config(&self) -> &NodeConfig {
self.builder.config()
}
/// Returns a reference to the task executor.
pub fn task_executor(&self) -> &TaskExecutor {
&self.task_executor
}
/// Returns a reference to the data directory.
pub fn data_dir(&self) -> &ChainPath<DataDirPath> {
&self.data_dir
}
}
impl<DB> WithLaunchContext<DB, InitState>
where
DB: Database + Clone + Unpin + 'static,
{
/// Configures the types of the node.
pub fn with_types<T>(self, types: T) -> WithLaunchContext<DB, TypesState<T, DB>>
where
T: NodeTypes,
{
WithLaunchContext {
builder: self.builder.with_types(types),
task_executor: self.task_executor,
data_dir: self.data_dir,
}
}
/// Preconfigures the node with a specific node implementation.
pub fn node<N>(
self,
node: N,
) -> WithLaunchContext<
DB,
ComponentsState<
N,
ComponentsBuilder<
RethFullAdapter<DB, N>,
N::PoolBuilder,
N::PayloadBuilder,
N::NetworkBuilder,
>,
FullNodeComponentsAdapter<
RethFullAdapter<DB, N>,
<N::PoolBuilder as PoolBuilder<RethFullAdapter<DB, N>>>::Pool,
>,
>,
>
where
N: Node<FullNodeTypesAdapter<N, DB, RethFullProviderType<DB, <N as NodeTypes>::Evm>>>,
N::PoolBuilder: PoolBuilder<RethFullAdapter<DB, N>>,
N::NetworkBuilder: crate::components::NetworkBuilder<
RethFullAdapter<DB, N>,
<N::PoolBuilder as PoolBuilder<RethFullAdapter<DB, N>>>::Pool,
>,
N::PayloadBuilder: crate::components::PayloadServiceBuilder<
RethFullAdapter<DB, N>,
<N::PoolBuilder as PoolBuilder<RethFullAdapter<DB, N>>>::Pool,
>,
{
self.with_types(node.clone()).with_components(node.components())
}
}
impl<DB> WithLaunchContext<DB, InitState>
where
DB: Database + DatabaseMetrics + DatabaseMetadata + Clone + Unpin + 'static,
{
/// Launches a preconfigured [Node]
///
/// This bootstraps the node internals, creates all the components with the given [Node] type
/// and launches the node.
///
/// Returns a [NodeHandle] that can be used to interact with the node.
pub async fn launch_node<N>(
self,
node: N,
) -> eyre::Result<
NodeHandle<
FullNodeComponentsAdapter<
RethFullAdapter<DB, N>,
<N::PoolBuilder as PoolBuilder<RethFullAdapter<DB, N>>>::Pool,
>,
>,
>
where
N: Node<FullNodeTypesAdapter<N, DB, RethFullProviderType<DB, <N as NodeTypes>::Evm>>>,
N::PoolBuilder: PoolBuilder<RethFullAdapter<DB, N>>,
N::NetworkBuilder: crate::components::NetworkBuilder<
RethFullAdapter<DB, N>,
<N::PoolBuilder as PoolBuilder<RethFullAdapter<DB, N>>>::Pool,
>,
N::PayloadBuilder: crate::components::PayloadServiceBuilder<
RethFullAdapter<DB, N>,
<N::PoolBuilder as PoolBuilder<RethFullAdapter<DB, N>>>::Pool,
>,
{
self.node(node).launch().await
}
}
impl<DB, Types> WithLaunchContext<DB, TypesState<Types, DB>>
where
Types: NodeTypes,
DB: Database + Clone + Unpin + 'static,
{
/// Configures the node's components.
///
/// The given components builder is used to create the components of the node when it is
/// launched.
pub fn with_components<Components>(
self,
components_builder: Components,
) -> WithLaunchContext<
DB,
ComponentsState<
Types,
Components,
FullNodeComponentsAdapter<
FullNodeTypesAdapter<Types, DB, RethFullProviderType<DB, Types::Evm>>,
Components::Pool,
>,
>,
>
where
Components: NodeComponentsBuilder<
FullNodeTypesAdapter<Types, DB, RethFullProviderType<DB, Types::Evm>>,
>,
{
WithLaunchContext {
builder: self.builder.with_components(components_builder),
task_executor: self.task_executor,
data_dir: self.data_dir,
}
}
}
impl<DB, Types, Components>
WithLaunchContext<
DB,
ComponentsState<
Types,
Components,
FullNodeComponentsAdapter<
FullNodeTypesAdapter<Types, DB, RethFullProviderType<DB, Types::Evm>>,
Components::Pool,
>,
>,
>
where
DB: Database + DatabaseMetrics + DatabaseMetadata + Clone + Unpin + 'static,
Types: NodeTypes,
Components: NodeComponentsBuilder<
FullNodeTypesAdapter<Types, DB, RethFullProviderType<DB, Types::Evm>>,
>,
{
/// Apply a function to the components builder.
pub fn map_components(self, f: impl FnOnce(Components) -> Components) -> Self {
Self {
builder: self.builder.map_components(f),
task_executor: self.task_executor,
data_dir: self.data_dir,
}
}
/// Sets the hook that is run once the node's components are initialized.
pub fn on_component_initialized<F>(mut self, hook: F) -> Self
where
F: Fn(
FullNodeComponentsAdapter<
FullNodeTypesAdapter<Types, DB, RethFullProviderType<DB, Types::Evm>>,
Components::Pool,
>,
) -> eyre::Result<()>
+ Send
+ 'static,
{
self.builder.state.hooks.set_on_component_initialized(hook);
self
}
/// Sets the hook that is run once the node has started.
pub fn on_node_started<F>(mut self, hook: F) -> Self
where
F: Fn(
FullNode<
FullNodeComponentsAdapter<
FullNodeTypesAdapter<Types, DB, RethFullProviderType<DB, Types::Evm>>,
Components::Pool,
>,
>,
) -> eyre::Result<()>
+ Send
+ 'static,
{
self.builder.state.hooks.set_on_node_started(hook);
self
}
/// Sets the hook that is run once the rpc server is started.
pub fn on_rpc_started<F>(mut self, hook: F) -> Self
where
F: Fn(
RpcContext<
'_,
FullNodeComponentsAdapter<
FullNodeTypesAdapter<Types, DB, RethFullProviderType<DB, Types::Evm>>,
Components::Pool,
>,
>,
RethRpcServerHandles,
) -> eyre::Result<()>
+ Send
+ 'static,
{
self.builder.state.rpc.set_on_rpc_started(hook);
self
}
/// Sets the hook that is run to configure the rpc modules.
pub fn extend_rpc_modules<F>(mut self, hook: F) -> Self
where
F: Fn(
RpcContext<
'_,
FullNodeComponentsAdapter<
FullNodeTypesAdapter<Types, DB, RethFullProviderType<DB, Types::Evm>>,
Components::Pool,
>,
>,
) -> eyre::Result<()>
+ Send
+ 'static,
{
self.builder.state.rpc.set_extend_rpc_modules(hook);
self
}
/// Launches the node and returns a handle to it.
pub async fn launch(
self,
) -> eyre::Result<
NodeHandle<
FullNodeComponentsAdapter<
FullNodeTypesAdapter<Types, DB, RethFullProviderType<DB, Types::Evm>>,
Components::Pool,
>,
>,
> {
let Self { builder, task_executor, data_dir } = self;
builder.launch(task_executor, data_dir).await
}
/// Check that the builder can be launched
///
/// This is useful when writing tests to ensure that the builder is configured correctly.
pub fn check_launch(self) -> Self {
self
}
}
/// Captures the necessary context for building the components of the node.
#[derive(Debug)]
pub struct BuilderContext<Node: FullNodeTypes> {
@@ -712,6 +1050,17 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
self.config.builder.clone()
}
/// Returns the default network config for the node.
pub fn network_config(&self) -> eyre::Result<NetworkConfig<Node::Provider>> {
self.config.network_config(
&self.reth_config,
self.provider.clone(),
self.executor.clone(),
self.head,
self.data_dir(),
)
}
/// Creates the [NetworkBuilder] for the node.
pub async fn network_builder(&self) -> eyre::Result<NetworkBuilder<Node::Provider, (), ()>> {
self.config
@@ -725,11 +1074,8 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
.await
}
/// Creates the [NetworkBuilder] for the node and blocks until it is ready.
pub fn network_builder_blocking(&self) -> eyre::Result<NetworkBuilder<Node::Provider, (), ()>> {
self.executor.block_on(self.network_builder())
}
/// Convenience function to start the network.
///
/// Spawns the configured network and associated tasks and returns the [NetworkHandle] connected
/// to that network.
pub fn start_network<Pool>(
@@ -741,17 +1087,7 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
Pool: TransactionPool + Unpin + 'static,
{
let (handle, network, txpool, eth) = builder
.transactions(
pool,
TransactionsManagerConfig {
transaction_fetcher_config: TransactionFetcherConfig::new(
self.config.network.soft_limit_byte_size_pooled_transactions_response,
self.config
.network
.soft_limit_byte_size_pooled_transactions_response_on_pack_request,
),
},
)
.transactions(pool, Default::default())
.request_handler(self.provider().clone())
.split_with_handle();