mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
feat(node-builder): move network setup to node-builder launch/ BuilderContext types (#8648)
This commit is contained in:
@@ -40,9 +40,6 @@ reth-beacon-consensus.workspace = true
|
||||
# ethereum
|
||||
alloy-rpc-types-engine.workspace = true
|
||||
|
||||
# ethereum
|
||||
discv5.workspace = true
|
||||
|
||||
# async
|
||||
tokio.workspace = true
|
||||
tokio-util.workspace = true
|
||||
|
||||
@@ -2,36 +2,29 @@
|
||||
|
||||
use crate::{
|
||||
args::{
|
||||
get_secret_key, DatabaseArgs, DatadirArgs, DebugArgs, DevArgs, DiscoveryArgs, NetworkArgs,
|
||||
PayloadBuilderArgs, PruningArgs, RpcServerArgs, TxPoolArgs,
|
||||
DatabaseArgs, DatadirArgs, DebugArgs, DevArgs, NetworkArgs, PayloadBuilderArgs,
|
||||
PruningArgs, RpcServerArgs, TxPoolArgs,
|
||||
},
|
||||
dirs::{ChainPath, DataDirPath},
|
||||
metrics::prometheus_exporter,
|
||||
utils::get_single_header,
|
||||
};
|
||||
use discv5::ListenConfig;
|
||||
use metrics_exporter_prometheus::PrometheusHandle;
|
||||
use once_cell::sync::Lazy;
|
||||
use reth_config::{config::PruneConfig, Config};
|
||||
use reth_config::config::PruneConfig;
|
||||
use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
|
||||
use reth_network::{NetworkBuilder, NetworkConfig, NetworkManager};
|
||||
use reth_network_p2p::headers::client::HeadersClient;
|
||||
use reth_primitives::{
|
||||
constants::eip4844::MAINNET_KZG_TRUSTED_SETUP, kzg::KzgSettings, stage::StageId,
|
||||
BlockHashOrNumber, BlockNumber, ChainSpec, Head, SealedHeader, B256, MAINNET,
|
||||
};
|
||||
use reth_provider::{
|
||||
providers::StaticFileProvider, BlockHashReader, BlockNumReader, HeaderProvider,
|
||||
ProviderFactory, StageCheckpointReader,
|
||||
providers::StaticFileProvider, BlockHashReader, HeaderProvider, ProviderFactory,
|
||||
StageCheckpointReader,
|
||||
};
|
||||
use reth_storage_errors::provider::ProviderResult;
|
||||
use reth_tasks::TaskExecutor;
|
||||
use secp256k1::SecretKey;
|
||||
use std::{
|
||||
net::{IpAddr, SocketAddr, SocketAddrV4, SocketAddrV6},
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
};
|
||||
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
|
||||
use tracing::*;
|
||||
|
||||
/// The default prometheus recorder handle. We use a global static to ensure that it is only
|
||||
@@ -245,15 +238,6 @@ impl NodeConfig {
|
||||
self
|
||||
}
|
||||
|
||||
/// Get the network secret from the given data dir
|
||||
pub fn network_secret(&self) -> eyre::Result<SecretKey> {
|
||||
let network_secret_path =
|
||||
self.network.p2p_secret_key.clone().unwrap_or_else(|| self.datadir().p2p_secret());
|
||||
debug!(target: "reth::cli", ?network_secret_path, "Loading p2p key file");
|
||||
let secret_key = get_secret_key(&network_secret_path)?;
|
||||
Ok(secret_key)
|
||||
}
|
||||
|
||||
/// Returns the initial pipeline target, based on whether or not the node is running in
|
||||
/// `debug.tip` mode, `debug.continuous` mode, or neither.
|
||||
///
|
||||
@@ -302,38 +286,6 @@ impl NodeConfig {
|
||||
Ok(max_block)
|
||||
}
|
||||
|
||||
/// Create the [`NetworkConfig`] for the node
|
||||
pub fn network_config<C>(
|
||||
&self,
|
||||
config: &Config,
|
||||
client: C,
|
||||
executor: TaskExecutor,
|
||||
head: Head,
|
||||
) -> eyre::Result<NetworkConfig<C>> {
|
||||
info!(target: "reth::cli", "Connecting to P2P network");
|
||||
let secret_key = self.network_secret()?;
|
||||
let default_peers_path = self.datadir().known_peers();
|
||||
Ok(self.load_network_config(config, client, executor, head, secret_key, default_peers_path))
|
||||
}
|
||||
|
||||
/// Create the [`NetworkBuilder`].
|
||||
///
|
||||
/// This only configures it and does not spawn it.
|
||||
pub async fn build_network<C>(
|
||||
&self,
|
||||
config: &Config,
|
||||
client: C,
|
||||
executor: TaskExecutor,
|
||||
head: Head,
|
||||
) -> eyre::Result<NetworkBuilder<C, (), ()>>
|
||||
where
|
||||
C: BlockNumReader,
|
||||
{
|
||||
let network_config = self.network_config(config, client, executor, head)?;
|
||||
let builder = NetworkManager::builder(network_config).await?;
|
||||
Ok(builder)
|
||||
}
|
||||
|
||||
/// Loads '`MAINNET_KZG_TRUSTED_SETUP`'
|
||||
pub fn kzg_settings(&self) -> eyre::Result<Arc<KzgSettings>> {
|
||||
Ok(Arc::clone(&MAINNET_KZG_TRUSTED_SETUP))
|
||||
@@ -454,65 +406,6 @@ impl NodeConfig {
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds the [`NetworkConfig`] with the given [`ProviderFactory`].
|
||||
pub fn load_network_config<C>(
|
||||
&self,
|
||||
config: &Config,
|
||||
client: C,
|
||||
executor: TaskExecutor,
|
||||
head: Head,
|
||||
secret_key: SecretKey,
|
||||
default_peers_path: PathBuf,
|
||||
) -> NetworkConfig<C> {
|
||||
self.network
|
||||
.network_config(config, self.chain.clone(), secret_key, default_peers_path)
|
||||
.with_task_executor(Box::new(executor))
|
||||
.set_head(head)
|
||||
.listener_addr(SocketAddr::new(
|
||||
self.network.addr,
|
||||
// set discovery port based on instance number
|
||||
self.network.port + self.instance - 1,
|
||||
))
|
||||
.disable_discv4_discovery_if(self.chain.chain.is_optimism())
|
||||
.discovery_addr(SocketAddr::new(
|
||||
self.network.discovery.addr,
|
||||
// set discovery port based on instance number
|
||||
self.network.discovery.port + self.instance - 1,
|
||||
))
|
||||
.map_discv5_config_builder(|builder| {
|
||||
let DiscoveryArgs {
|
||||
discv5_addr,
|
||||
discv5_addr_ipv6,
|
||||
discv5_port,
|
||||
discv5_port_ipv6,
|
||||
..
|
||||
} = self.network.discovery;
|
||||
|
||||
// Use rlpx address if none given
|
||||
let discv5_addr_ipv4 = discv5_addr.or(match self.network.addr {
|
||||
IpAddr::V4(ip) => Some(ip),
|
||||
IpAddr::V6(_) => None,
|
||||
});
|
||||
let discv5_addr_ipv6 = discv5_addr_ipv6.or(match self.network.addr {
|
||||
IpAddr::V4(_) => None,
|
||||
IpAddr::V6(ip) => Some(ip),
|
||||
});
|
||||
|
||||
let discv5_port_ipv4 = discv5_port + self.instance - 1;
|
||||
let discv5_port_ipv6 = discv5_port_ipv6 + self.instance - 1;
|
||||
|
||||
builder.discv5_config(
|
||||
discv5::ConfigBuilder::new(ListenConfig::from_two_sockets(
|
||||
discv5_addr_ipv4.map(|addr| SocketAddrV4::new(addr, discv5_port_ipv4)),
|
||||
discv5_addr_ipv6
|
||||
.map(|addr| SocketAddrV6::new(addr, discv5_port_ipv6, 0, 0)),
|
||||
))
|
||||
.build(),
|
||||
)
|
||||
})
|
||||
.build(client)
|
||||
}
|
||||
|
||||
/// Change rpc port numbers based on the instance number, using the inner
|
||||
/// [`RpcServerArgs::adjust_instance_ports`] method.
|
||||
pub fn adjust_instance_ports(&mut self) {
|
||||
|
||||
@@ -52,6 +52,16 @@ tokio = { workspace = true, features = [
|
||||
] }
|
||||
tokio-stream.workspace = true
|
||||
|
||||
# ethereum
|
||||
discv5.workspace = true
|
||||
|
||||
# crypto
|
||||
secp256k1 = { workspace = true, features = [
|
||||
"global-context",
|
||||
"rand-std",
|
||||
"recovery",
|
||||
] }
|
||||
|
||||
## misc
|
||||
aquamarine.workspace = true
|
||||
eyre.workspace = true
|
||||
|
||||
@@ -3,11 +3,13 @@
|
||||
#![allow(clippy::type_complexity, missing_debug_implementations)]
|
||||
|
||||
use crate::{
|
||||
common::WithConfigs,
|
||||
components::NodeComponentsBuilder,
|
||||
node::FullNode,
|
||||
rpc::{RethRpcServerHandles, RpcContext},
|
||||
DefaultNodeLauncher, Node, NodeHandle,
|
||||
};
|
||||
use discv5::ListenConfig;
|
||||
use futures::Future;
|
||||
use reth_db::{
|
||||
test_utils::{create_test_rw_db_with_path, tempdir_path, TempDatabase},
|
||||
@@ -18,12 +20,12 @@ use reth_db_api::{
|
||||
database_metrics::{DatabaseMetadata, DatabaseMetrics},
|
||||
};
|
||||
use reth_exex::ExExContext;
|
||||
use reth_network::{NetworkBuilder, NetworkConfig, NetworkHandle};
|
||||
use reth_network::{NetworkBuilder, NetworkConfig, NetworkHandle, NetworkManager};
|
||||
use reth_node_api::{FullNodeTypes, FullNodeTypesAdapter, NodeTypes};
|
||||
use reth_node_core::{
|
||||
args::DatadirArgs,
|
||||
args::{get_secret_key, DatadirArgs},
|
||||
cli::config::{PayloadBuilderConfig, RethTransactionPoolConfig},
|
||||
dirs::{DataDirPath, MaybePlatformPath},
|
||||
dirs::{ChainPath, DataDirPath, MaybePlatformPath},
|
||||
node_config::NodeConfig,
|
||||
primitives::{kzg::KzgSettings, Head},
|
||||
utils::write_peers_to_file,
|
||||
@@ -32,8 +34,13 @@ use reth_primitives::{constants::eip4844::MAINNET_KZG_TRUSTED_SETUP, ChainSpec};
|
||||
use reth_provider::{providers::BlockchainProvider, ChainSpecProvider};
|
||||
use reth_tasks::TaskExecutor;
|
||||
use reth_transaction_pool::{PoolConfig, TransactionPool};
|
||||
use secp256k1::SecretKey;
|
||||
pub use states::*;
|
||||
use std::sync::Arc;
|
||||
use std::{
|
||||
net::{IpAddr, SocketAddr, SocketAddrV4, SocketAddrV6},
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
mod states;
|
||||
|
||||
@@ -403,10 +410,8 @@ pub struct BuilderContext<Node: FullNodeTypes> {
|
||||
pub(crate) provider: Node::Provider,
|
||||
/// The executor of the node.
|
||||
pub(crate) executor: TaskExecutor,
|
||||
/// The config of the node
|
||||
pub(crate) config: NodeConfig,
|
||||
/// loaded config
|
||||
pub(crate) reth_config: reth_config::Config,
|
||||
/// Config container
|
||||
pub(crate) config_container: WithConfigs,
|
||||
}
|
||||
|
||||
impl<Node: FullNodeTypes> BuilderContext<Node> {
|
||||
@@ -415,10 +420,9 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
|
||||
head: Head,
|
||||
provider: Node::Provider,
|
||||
executor: TaskExecutor,
|
||||
config: NodeConfig,
|
||||
reth_config: reth_config::Config,
|
||||
config_container: WithConfigs,
|
||||
) -> Self {
|
||||
Self { head, provider, executor, config, reth_config }
|
||||
Self { head, provider, executor, config_container }
|
||||
}
|
||||
|
||||
/// Returns the configured provider to interact with the blockchain.
|
||||
@@ -433,7 +437,12 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
|
||||
|
||||
/// Returns the config of the node.
|
||||
pub const fn config(&self) -> &NodeConfig {
|
||||
&self.config
|
||||
&self.config_container.config
|
||||
}
|
||||
|
||||
/// Returns the loaded reh.toml config.
|
||||
pub const fn reth_config(&self) -> &reth_config::Config {
|
||||
&self.config_container.toml_config
|
||||
}
|
||||
|
||||
/// Returns the executor of the node.
|
||||
@@ -460,29 +469,14 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
|
||||
|
||||
/// Returns the config for payload building.
|
||||
pub fn payload_builder_config(&self) -> impl PayloadBuilderConfig {
|
||||
self.config.builder.clone()
|
||||
}
|
||||
|
||||
/// Returns the default network config for the node.
|
||||
pub fn network_config(&self) -> eyre::Result<NetworkConfig<Node::Provider>> {
|
||||
self.config.network_config(
|
||||
&self.reth_config,
|
||||
self.provider.clone(),
|
||||
self.executor.clone(),
|
||||
self.head,
|
||||
)
|
||||
self.config().builder.clone()
|
||||
}
|
||||
|
||||
/// Creates the [`NetworkBuilder`] for the node.
|
||||
pub async fn network_builder(&self) -> eyre::Result<NetworkBuilder<Node::Provider, (), ()>> {
|
||||
self.config
|
||||
.build_network(
|
||||
&self.reth_config,
|
||||
self.provider.clone(),
|
||||
self.executor.clone(),
|
||||
self.head,
|
||||
)
|
||||
.await
|
||||
let network_config = self.network_config()?;
|
||||
let builder = NetworkManager::builder(network_config).await?;
|
||||
Ok(builder)
|
||||
}
|
||||
|
||||
/// Convenience function to start the network.
|
||||
@@ -505,8 +499,8 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
|
||||
self.executor.spawn_critical("p2p txpool", txpool);
|
||||
self.executor.spawn_critical("p2p eth request handler", eth);
|
||||
|
||||
let default_peers_path = self.config.datadir().known_peers();
|
||||
let known_peers_file = self.config.network.persistent_peers_file(default_peers_path);
|
||||
let default_peers_path = self.config().datadir().known_peers();
|
||||
let known_peers_file = self.config().network.persistent_peers_file(default_peers_path);
|
||||
self.executor.spawn_critical_with_graceful_shutdown_signal(
|
||||
"p2p network task",
|
||||
|shutdown| {
|
||||
@@ -518,6 +512,80 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
|
||||
|
||||
handle
|
||||
}
|
||||
|
||||
/// Returns the default network config for the node.
|
||||
pub fn network_config(&self) -> eyre::Result<NetworkConfig<Node::Provider>> {
|
||||
let secret_key = self.network_secret(&self.config().datadir())?;
|
||||
let default_peers_path = self.config().datadir().known_peers();
|
||||
Ok(self.build_network_config(secret_key, default_peers_path))
|
||||
}
|
||||
|
||||
/// Get the network secret from the given data dir
|
||||
fn network_secret(&self, data_dir: &ChainPath<DataDirPath>) -> eyre::Result<SecretKey> {
|
||||
let network_secret_path =
|
||||
self.config().network.p2p_secret_key.clone().unwrap_or_else(|| data_dir.p2p_secret());
|
||||
let secret_key = get_secret_key(&network_secret_path)?;
|
||||
Ok(secret_key)
|
||||
}
|
||||
|
||||
/// Builds the [`NetworkConfig`].
|
||||
fn build_network_config(
|
||||
&self,
|
||||
secret_key: SecretKey,
|
||||
default_peers_path: PathBuf,
|
||||
) -> NetworkConfig<Node::Provider> {
|
||||
self.config()
|
||||
.network
|
||||
.network_config(
|
||||
self.reth_config(),
|
||||
self.config().chain.clone(),
|
||||
secret_key,
|
||||
default_peers_path,
|
||||
)
|
||||
.with_task_executor(Box::new(self.executor.clone()))
|
||||
.set_head(self.head)
|
||||
.listener_addr(SocketAddr::new(
|
||||
self.config().network.addr,
|
||||
// set discovery port based on instance number
|
||||
self.config().network.port + self.config().instance - 1,
|
||||
))
|
||||
.disable_discv4_discovery_if(self.config().chain.chain.is_optimism())
|
||||
.discovery_addr(SocketAddr::new(
|
||||
self.config().network.discovery.addr,
|
||||
// set discovery port based on instance number
|
||||
self.config().network.discovery.port + self.config().instance - 1,
|
||||
))
|
||||
.map_discv5_config_builder(|builder| {
|
||||
// Use rlpx address if none given
|
||||
let discv5_addr_ipv4 = self.config().network.discovery.discv5_addr.or(
|
||||
match self.config().network.addr {
|
||||
IpAddr::V4(ip) => Some(ip),
|
||||
IpAddr::V6(_) => None,
|
||||
},
|
||||
);
|
||||
let discv5_addr_ipv6 = self.config().network.discovery.discv5_addr_ipv6.or(
|
||||
match self.config().network.addr {
|
||||
IpAddr::V4(_) => None,
|
||||
IpAddr::V6(ip) => Some(ip),
|
||||
},
|
||||
);
|
||||
|
||||
let discv5_port_ipv4 =
|
||||
self.config().network.discovery.discv5_port + self.config().instance - 1;
|
||||
let discv5_port_ipv6 =
|
||||
self.config().network.discovery.discv5_port_ipv6 + self.config().instance - 1;
|
||||
|
||||
builder.discv5_config(
|
||||
discv5::ConfigBuilder::new(ListenConfig::from_two_sockets(
|
||||
discv5_addr_ipv4.map(|addr| SocketAddrV4::new(addr, discv5_port_ipv4)),
|
||||
discv5_addr_ipv6
|
||||
.map(|addr| SocketAddrV6::new(addr, discv5_port_ipv6, 0, 0)),
|
||||
))
|
||||
.build(),
|
||||
)
|
||||
})
|
||||
.build(self.provider.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl<Node: FullNodeTypes> std::fmt::Debug for BuilderContext<Node> {
|
||||
@@ -526,7 +594,7 @@ impl<Node: FullNodeTypes> std::fmt::Debug for BuilderContext<Node> {
|
||||
.field("head", &self.head)
|
||||
.field("provider", &std::any::type_name::<Node::Provider>())
|
||||
.field("executor", &self.executor)
|
||||
.field("config", &self.config)
|
||||
.field("config", &self.config())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ use crate::{
|
||||
builder::{NodeAdapter, NodeAddOns, NodeTypesAdapter},
|
||||
components::{NodeComponents, NodeComponentsBuilder},
|
||||
hooks::NodeHooks,
|
||||
launch::common::WithConfigs,
|
||||
node::FullNode,
|
||||
BuilderContext, NodeBuilderWithComponents, NodeHandle,
|
||||
};
|
||||
@@ -142,12 +143,15 @@ where
|
||||
)),
|
||||
)?;
|
||||
|
||||
let config_container = WithConfigs {
|
||||
config: ctx.node_config().clone(),
|
||||
toml_config: ctx.toml_config().clone(),
|
||||
};
|
||||
let builder_ctx = BuilderContext::new(
|
||||
head,
|
||||
blockchain_db.clone(),
|
||||
ctx.task_executor().clone(),
|
||||
ctx.node_config().clone(),
|
||||
ctx.toml_config().clone(),
|
||||
config_container,
|
||||
);
|
||||
|
||||
debug!(target: "reth::cli", "creating components");
|
||||
|
||||
Reference in New Issue
Block a user