mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-29 09:08:05 -05:00
feat: add configure network function to cli ext (#5536)
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
//! Config traits for various node components.
|
||||
|
||||
use alloy_rlp::Encodable;
|
||||
use reth_network::protocol::IntoRlpxSubProtocol;
|
||||
use reth_primitives::{Bytes, BytesMut};
|
||||
use reth_rpc::{eth::gas_oracle::GasPriceOracleConfig, JwtError, JwtSecret};
|
||||
use reth_rpc_builder::{
|
||||
@@ -102,3 +103,21 @@ pub trait PayloadBuilderConfig {
|
||||
#[cfg(feature = "optimism")]
|
||||
fn compute_pending_block(&self) -> bool;
|
||||
}
|
||||
|
||||
/// A trait that can be used to apply additional configuration to the network.
|
||||
pub trait RethNetworkConfig {
|
||||
/// Adds a new additional protocol to the RLPx sub-protocol list.
|
||||
///
|
||||
/// These additional protocols are negotiated during the RLPx handshake.
|
||||
/// If both peers share the same protocol, the corresponding handler will be included alongside
|
||||
/// the `eth` protocol.
|
||||
///
|
||||
/// See also [ProtocolHandler](reth_network::protocol::ProtocolHandler)
|
||||
fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol);
|
||||
}
|
||||
|
||||
impl<C> RethNetworkConfig for reth_network::NetworkManager<C> {
|
||||
fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
|
||||
reth_network::NetworkManager::add_rlpx_sub_protocol(self, protocol);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService};
|
||||
use reth_tasks::TaskSpawner;
|
||||
use std::{fmt, marker::PhantomData};
|
||||
|
||||
use crate::cli::components::RethRpcServerHandles;
|
||||
use crate::cli::{components::RethRpcServerHandles, config::RethNetworkConfig};
|
||||
|
||||
/// A trait that allows for extending parts of the CLI with additional functionality.
|
||||
///
|
||||
@@ -35,12 +35,30 @@ impl RethCliExt for () {
|
||||
///
|
||||
/// The functions are invoked during the initialization of the node command in the following order:
|
||||
///
|
||||
/// 1. [on_components_initialized](RethNodeCommandConfig::on_components_initialized)
|
||||
/// 2. [spawn_payload_builder_service](RethNodeCommandConfig::spawn_payload_builder_service)
|
||||
/// 3. [extend_rpc_modules](RethNodeCommandConfig::extend_rpc_modules)
|
||||
/// 4. [on_rpc_server_started](RethNodeCommandConfig::on_rpc_server_started)
|
||||
/// 5. [on_node_started](RethNodeCommandConfig::on_node_started)
|
||||
/// 1. [configure_network](RethNodeCommandConfig::configure_network)
|
||||
/// 2. [on_components_initialized](RethNodeCommandConfig::on_components_initialized)
|
||||
/// 3. [spawn_payload_builder_service](RethNodeCommandConfig::spawn_payload_builder_service)
|
||||
/// 4. [extend_rpc_modules](RethNodeCommandConfig::extend_rpc_modules)
|
||||
/// 5. [on_rpc_server_started](RethNodeCommandConfig::on_rpc_server_started)
|
||||
/// 6. [on_node_started](RethNodeCommandConfig::on_node_started)
|
||||
pub trait RethNodeCommandConfig: fmt::Debug {
|
||||
/// Invoked with the network configuration before the network is configured.
|
||||
///
|
||||
/// This allows additional configuration of the network before it is launched.
|
||||
fn configure_network<Conf, Reth>(
|
||||
&mut self,
|
||||
config: &mut Conf,
|
||||
components: &Reth,
|
||||
) -> eyre::Result<()>
|
||||
where
|
||||
Conf: RethNetworkConfig,
|
||||
Reth: RethNodeComponents,
|
||||
{
|
||||
let _ = config;
|
||||
let _ = components;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Event hook called once all components have been initialized.
|
||||
///
|
||||
/// This is called as soon as the node components have been initialized.
|
||||
@@ -224,6 +242,22 @@ impl<T> NoArgs<T> {
|
||||
}
|
||||
|
||||
impl<T: RethNodeCommandConfig> RethNodeCommandConfig for NoArgs<T> {
|
||||
fn configure_network<Conf, Reth>(
|
||||
&mut self,
|
||||
config: &mut Conf,
|
||||
components: &Reth,
|
||||
) -> eyre::Result<()>
|
||||
where
|
||||
Conf: RethNetworkConfig,
|
||||
Reth: RethNodeComponents,
|
||||
{
|
||||
if let Some(conf) = self.inner_mut() {
|
||||
conf.configure_network(config, components)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn on_components_initialized<Reth: RethNodeComponents>(
|
||||
&mut self,
|
||||
components: &Reth,
|
||||
|
||||
@@ -49,9 +49,7 @@ use reth_interfaces::{
|
||||
},
|
||||
RethResult,
|
||||
};
|
||||
use reth_network::{
|
||||
error::NetworkError, NetworkConfig, NetworkEvents, NetworkHandle, NetworkManager,
|
||||
};
|
||||
use reth_network::{NetworkBuilder, NetworkConfig, NetworkEvents, NetworkHandle, NetworkManager};
|
||||
use reth_network_api::{NetworkInfo, PeersInfo};
|
||||
use reth_primitives::{
|
||||
constants::eip4844::{LoadKzgSettingsError, MAINNET_KZG_TRUSTED_SETUP},
|
||||
@@ -356,25 +354,34 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
|
||||
secret_key,
|
||||
default_peers_path.clone(),
|
||||
);
|
||||
let network = self
|
||||
.start_network(
|
||||
network_config,
|
||||
&ctx.task_executor,
|
||||
transaction_pool.clone(),
|
||||
default_peers_path,
|
||||
)
|
||||
.await?;
|
||||
info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), enode = %network.local_node_record(), "Connected to P2P network");
|
||||
debug!(target: "reth::cli", peer_id = ?network.peer_id(), "Full peer ID");
|
||||
let network_client = network.fetch_client().await?;
|
||||
|
||||
let network_client = network_config.client.clone();
|
||||
let mut network_builder = NetworkManager::builder(network_config).await?;
|
||||
|
||||
let components = RethNodeComponentsImpl {
|
||||
provider: blockchain_db.clone(),
|
||||
pool: transaction_pool.clone(),
|
||||
network: network.clone(),
|
||||
network: network_builder.handle(),
|
||||
task_executor: ctx.task_executor.clone(),
|
||||
events: blockchain_db.clone(),
|
||||
};
|
||||
|
||||
// allow network modifications
|
||||
self.ext.configure_network(network_builder.network_mut(), &components)?;
|
||||
|
||||
// launch network
|
||||
let network = self.start_network(
|
||||
network_builder,
|
||||
&ctx.task_executor,
|
||||
transaction_pool.clone(),
|
||||
network_client,
|
||||
default_peers_path,
|
||||
);
|
||||
|
||||
info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), enode = %network.local_node_record(), "Connected to P2P network");
|
||||
debug!(target: "reth::cli", peer_id = ?network.peer_id(), "Full peer ID");
|
||||
let network_client = network.fetch_client().await?;
|
||||
|
||||
self.ext.on_components_initialized(&components)?;
|
||||
|
||||
debug!(target: "reth::cli", "Spawning payload builder service");
|
||||
@@ -694,23 +701,20 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
|
||||
|
||||
/// Spawns the configured network and associated tasks and returns the [NetworkHandle] connected
|
||||
/// to that network.
|
||||
async fn start_network<C, Pool>(
|
||||
fn start_network<C, Pool>(
|
||||
&self,
|
||||
config: NetworkConfig<C>,
|
||||
builder: NetworkBuilder<C, (), ()>,
|
||||
task_executor: &TaskExecutor,
|
||||
pool: Pool,
|
||||
client: C,
|
||||
default_peers_path: PathBuf,
|
||||
) -> Result<NetworkHandle, NetworkError>
|
||||
) -> NetworkHandle
|
||||
where
|
||||
C: BlockReader + HeaderProvider + Clone + Unpin + 'static,
|
||||
Pool: TransactionPool + Unpin + 'static,
|
||||
{
|
||||
let client = config.client.clone();
|
||||
let (handle, network, txpool, eth) = NetworkManager::builder(config)
|
||||
.await?
|
||||
.transactions(pool)
|
||||
.request_handler(client)
|
||||
.split_with_handle();
|
||||
let (handle, network, txpool, eth) =
|
||||
builder.transactions(pool).request_handler(client).split_with_handle();
|
||||
|
||||
task_executor.spawn_critical("p2p txpool", txpool);
|
||||
task_executor.spawn_critical("p2p eth request handler", eth);
|
||||
@@ -720,7 +724,7 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
|
||||
run_network_until_shutdown(shutdown, network, known_peers_file)
|
||||
});
|
||||
|
||||
Ok(handle)
|
||||
handle
|
||||
}
|
||||
|
||||
/// Fetches the head block from the database.
|
||||
|
||||
@@ -28,6 +28,21 @@ impl<C, Tx, Eth> NetworkBuilder<C, Tx, Eth> {
|
||||
(network, transactions, request_handler)
|
||||
}
|
||||
|
||||
/// Returns the network manager.
|
||||
pub fn network(&self) -> &NetworkManager<C> {
|
||||
&self.network
|
||||
}
|
||||
|
||||
/// Returns the mutable network manager.
|
||||
pub fn network_mut(&mut self) -> &mut NetworkManager<C> {
|
||||
&mut self.network
|
||||
}
|
||||
|
||||
/// Returns the handle to the network.
|
||||
pub fn handle(&self) -> NetworkHandle {
|
||||
self.network.handle().clone()
|
||||
}
|
||||
|
||||
/// Consumes the type and returns all fields and also return a [`NetworkHandle`].
|
||||
pub fn split_with_handle(self) -> (NetworkHandle, NetworkManager<C>, Tx, Eth) {
|
||||
let NetworkBuilder { network, transactions, request_handler } = self;
|
||||
|
||||
@@ -26,6 +26,7 @@ use crate::{
|
||||
metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
|
||||
network::{NetworkHandle, NetworkHandleMessage},
|
||||
peers::{PeersHandle, PeersManager},
|
||||
protocol::IntoRlpxSubProtocol,
|
||||
session::SessionManager,
|
||||
state::NetworkState,
|
||||
swarm::{NetworkConnectionState, Swarm, SwarmEvent},
|
||||
@@ -142,6 +143,11 @@ impl<C> NetworkManager<C> {
|
||||
self.to_eth_request_handler = Some(tx);
|
||||
}
|
||||
|
||||
/// Adds an additional protocol handler to the RLPx sub-protocol list.
|
||||
pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
|
||||
self.swarm.add_rlpx_sub_protocol(protocol)
|
||||
}
|
||||
|
||||
/// Returns the [`NetworkHandle`] that can be cloned and shared.
|
||||
///
|
||||
/// The [`NetworkHandle`] can be used to interact with this [`NetworkManager`]
|
||||
|
||||
@@ -116,6 +116,12 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl IntoRlpxSubProtocol for RlpxSubProtocol {
|
||||
fn into_rlpx_sub_protocol(self) -> RlpxSubProtocol {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// Additional RLPx-based sub-protocols.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct RlpxSubProtocols {
|
||||
|
||||
@@ -48,7 +48,7 @@ pub use handle::{
|
||||
SessionCommand,
|
||||
};
|
||||
|
||||
use crate::protocol::RlpxSubProtocols;
|
||||
use crate::protocol::{IntoRlpxSubProtocol, RlpxSubProtocols};
|
||||
pub use reth_network_api::{Direction, PeerInfo};
|
||||
|
||||
/// Internal identifier for active sessions.
|
||||
@@ -103,7 +103,6 @@ pub struct SessionManager {
|
||||
/// Receiver half that listens for [`ActiveSessionMessage`] produced by pending sessions.
|
||||
active_session_rx: ReceiverStream<ActiveSessionMessage>,
|
||||
/// Additional RLPx sub-protocols to be used by the session manager.
|
||||
#[allow(unused)]
|
||||
extra_protocols: RlpxSubProtocols,
|
||||
/// Used to measure inbound & outbound bandwidth across all managed streams
|
||||
bandwidth_meter: BandwidthMeter,
|
||||
@@ -176,6 +175,11 @@ impl SessionManager {
|
||||
self.hello_message.clone()
|
||||
}
|
||||
|
||||
/// Adds an additional protocol handler to the RLPx sub-protocol list.
|
||||
pub(crate) fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
|
||||
self.extra_protocols.push(protocol)
|
||||
}
|
||||
|
||||
/// Spawns the given future onto a new task that is tracked in the `spawned_tasks`
|
||||
/// [`JoinSet`](tokio::task::JoinSet).
|
||||
fn spawn<F>(&self, f: F)
|
||||
|
||||
@@ -2,6 +2,7 @@ use crate::{
|
||||
listener::{ConnectionListener, ListenerEvent},
|
||||
message::{PeerMessage, PeerRequestSender},
|
||||
peers::InboundConnectionError,
|
||||
protocol::IntoRlpxSubProtocol,
|
||||
session::{Direction, PendingSessionHandshakeError, SessionEvent, SessionId, SessionManager},
|
||||
state::{NetworkState, StateAction},
|
||||
};
|
||||
@@ -76,10 +77,7 @@ pub(crate) struct Swarm<C> {
|
||||
|
||||
// === impl Swarm ===
|
||||
|
||||
impl<C> Swarm<C>
|
||||
where
|
||||
C: BlockNumReader,
|
||||
{
|
||||
impl<C> Swarm<C> {
|
||||
/// Configures a new swarm instance.
|
||||
pub(crate) fn new(
|
||||
incoming: ConnectionListener,
|
||||
@@ -90,6 +88,11 @@ where
|
||||
Self { incoming, sessions, state, net_connection_state }
|
||||
}
|
||||
|
||||
/// Adds an additional protocol handler to the RLPx sub-protocol list.
|
||||
pub(crate) fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
|
||||
self.sessions_mut().add_rlpx_sub_protocol(protocol);
|
||||
}
|
||||
|
||||
/// Access to the state.
|
||||
pub(crate) fn state(&self) -> &NetworkState<C> {
|
||||
&self.state
|
||||
@@ -114,7 +117,12 @@ where
|
||||
pub(crate) fn sessions_mut(&mut self) -> &mut SessionManager {
|
||||
&mut self.sessions
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> Swarm<C>
|
||||
where
|
||||
C: BlockNumReader,
|
||||
{
|
||||
/// Triggers a new outgoing connection to the given node
|
||||
pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_id: PeerId) {
|
||||
self.sessions.dial_outbound(remote_addr, remote_id)
|
||||
|
||||
Reference in New Issue
Block a user