From 5e2affb15a7c43dc7ab6f1f709e4565acba78cc3 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 22 Nov 2023 20:58:02 +0100 Subject: [PATCH] feat: add configure network function to cli ext (#5536) --- bin/reth/src/cli/config.rs | 19 ++++++++++ bin/reth/src/cli/ext.rs | 46 ++++++++++++++++++++--- bin/reth/src/node/mod.rs | 54 ++++++++++++++------------- crates/net/network/src/builder.rs | 15 ++++++++ crates/net/network/src/manager.rs | 6 +++ crates/net/network/src/protocol.rs | 6 +++ crates/net/network/src/session/mod.rs | 8 +++- crates/net/network/src/swarm.rs | 16 ++++++-- 8 files changed, 133 insertions(+), 37 deletions(-) diff --git a/bin/reth/src/cli/config.rs b/bin/reth/src/cli/config.rs index 8700edf04b..48c1e2bd5f 100644 --- a/bin/reth/src/cli/config.rs +++ b/bin/reth/src/cli/config.rs @@ -1,6 +1,7 @@ //! Config traits for various node components. use alloy_rlp::Encodable; +use reth_network::protocol::IntoRlpxSubProtocol; use reth_primitives::{Bytes, BytesMut}; use reth_rpc::{eth::gas_oracle::GasPriceOracleConfig, JwtError, JwtSecret}; use reth_rpc_builder::{ @@ -102,3 +103,21 @@ pub trait PayloadBuilderConfig { #[cfg(feature = "optimism")] fn compute_pending_block(&self) -> bool; } + +/// A trait that can be used to apply additional configuration to the network. +pub trait RethNetworkConfig { + /// Adds a new additional protocol to the RLPx sub-protocol list. + /// + /// These additional protocols are negotiated during the RLPx handshake. + /// If both peers share the same protocol, the corresponding handler will be included alongside + /// the `eth` protocol. + /// + /// See also [ProtocolHandler](reth_network::protocol::ProtocolHandler) + fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol); +} + +impl RethNetworkConfig for reth_network::NetworkManager { + fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) { + reth_network::NetworkManager::add_rlpx_sub_protocol(self, protocol); + } +} diff --git a/bin/reth/src/cli/ext.rs b/bin/reth/src/cli/ext.rs index c48778f2c3..3529975273 100644 --- a/bin/reth/src/cli/ext.rs +++ b/bin/reth/src/cli/ext.rs @@ -10,7 +10,7 @@ use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService}; use reth_tasks::TaskSpawner; use std::{fmt, marker::PhantomData}; -use crate::cli::components::RethRpcServerHandles; +use crate::cli::{components::RethRpcServerHandles, config::RethNetworkConfig}; /// A trait that allows for extending parts of the CLI with additional functionality. /// @@ -35,12 +35,30 @@ impl RethCliExt for () { /// /// The functions are invoked during the initialization of the node command in the following order: /// -/// 1. [on_components_initialized](RethNodeCommandConfig::on_components_initialized) -/// 2. [spawn_payload_builder_service](RethNodeCommandConfig::spawn_payload_builder_service) -/// 3. [extend_rpc_modules](RethNodeCommandConfig::extend_rpc_modules) -/// 4. [on_rpc_server_started](RethNodeCommandConfig::on_rpc_server_started) -/// 5. [on_node_started](RethNodeCommandConfig::on_node_started) +/// 1. [configure_network](RethNodeCommandConfig::configure_network) +/// 2. [on_components_initialized](RethNodeCommandConfig::on_components_initialized) +/// 3. [spawn_payload_builder_service](RethNodeCommandConfig::spawn_payload_builder_service) +/// 4. [extend_rpc_modules](RethNodeCommandConfig::extend_rpc_modules) +/// 5. [on_rpc_server_started](RethNodeCommandConfig::on_rpc_server_started) +/// 6. [on_node_started](RethNodeCommandConfig::on_node_started) pub trait RethNodeCommandConfig: fmt::Debug { + /// Invoked with the network configuration before the network is configured. + /// + /// This allows additional configuration of the network before it is launched. + fn configure_network( + &mut self, + config: &mut Conf, + components: &Reth, + ) -> eyre::Result<()> + where + Conf: RethNetworkConfig, + Reth: RethNodeComponents, + { + let _ = config; + let _ = components; + Ok(()) + } + /// Event hook called once all components have been initialized. /// /// This is called as soon as the node components have been initialized. @@ -224,6 +242,22 @@ impl NoArgs { } impl RethNodeCommandConfig for NoArgs { + fn configure_network( + &mut self, + config: &mut Conf, + components: &Reth, + ) -> eyre::Result<()> + where + Conf: RethNetworkConfig, + Reth: RethNodeComponents, + { + if let Some(conf) = self.inner_mut() { + conf.configure_network(config, components) + } else { + Ok(()) + } + } + fn on_components_initialized( &mut self, components: &Reth, diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 3a694c7413..42d3e0136f 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -49,9 +49,7 @@ use reth_interfaces::{ }, RethResult, }; -use reth_network::{ - error::NetworkError, NetworkConfig, NetworkEvents, NetworkHandle, NetworkManager, -}; +use reth_network::{NetworkBuilder, NetworkConfig, NetworkEvents, NetworkHandle, NetworkManager}; use reth_network_api::{NetworkInfo, PeersInfo}; use reth_primitives::{ constants::eip4844::{LoadKzgSettingsError, MAINNET_KZG_TRUSTED_SETUP}, @@ -356,25 +354,34 @@ impl NodeCommand { secret_key, default_peers_path.clone(), ); - let network = self - .start_network( - network_config, - &ctx.task_executor, - transaction_pool.clone(), - default_peers_path, - ) - .await?; - info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), enode = %network.local_node_record(), "Connected to P2P network"); - debug!(target: "reth::cli", peer_id = ?network.peer_id(), "Full peer ID"); - let network_client = network.fetch_client().await?; + + let network_client = network_config.client.clone(); + let mut network_builder = NetworkManager::builder(network_config).await?; let components = RethNodeComponentsImpl { provider: blockchain_db.clone(), pool: transaction_pool.clone(), - network: network.clone(), + network: network_builder.handle(), task_executor: ctx.task_executor.clone(), events: blockchain_db.clone(), }; + + // allow network modifications + self.ext.configure_network(network_builder.network_mut(), &components)?; + + // launch network + let network = self.start_network( + network_builder, + &ctx.task_executor, + transaction_pool.clone(), + network_client, + default_peers_path, + ); + + info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), enode = %network.local_node_record(), "Connected to P2P network"); + debug!(target: "reth::cli", peer_id = ?network.peer_id(), "Full peer ID"); + let network_client = network.fetch_client().await?; + self.ext.on_components_initialized(&components)?; debug!(target: "reth::cli", "Spawning payload builder service"); @@ -694,23 +701,20 @@ impl NodeCommand { /// Spawns the configured network and associated tasks and returns the [NetworkHandle] connected /// to that network. - async fn start_network( + fn start_network( &self, - config: NetworkConfig, + builder: NetworkBuilder, task_executor: &TaskExecutor, pool: Pool, + client: C, default_peers_path: PathBuf, - ) -> Result + ) -> NetworkHandle where C: BlockReader + HeaderProvider + Clone + Unpin + 'static, Pool: TransactionPool + Unpin + 'static, { - let client = config.client.clone(); - let (handle, network, txpool, eth) = NetworkManager::builder(config) - .await? - .transactions(pool) - .request_handler(client) - .split_with_handle(); + let (handle, network, txpool, eth) = + builder.transactions(pool).request_handler(client).split_with_handle(); task_executor.spawn_critical("p2p txpool", txpool); task_executor.spawn_critical("p2p eth request handler", eth); @@ -720,7 +724,7 @@ impl NodeCommand { run_network_until_shutdown(shutdown, network, known_peers_file) }); - Ok(handle) + handle } /// Fetches the head block from the database. diff --git a/crates/net/network/src/builder.rs b/crates/net/network/src/builder.rs index efe17ec5a4..05c84b7da9 100644 --- a/crates/net/network/src/builder.rs +++ b/crates/net/network/src/builder.rs @@ -28,6 +28,21 @@ impl NetworkBuilder { (network, transactions, request_handler) } + /// Returns the network manager. + pub fn network(&self) -> &NetworkManager { + &self.network + } + + /// Returns the mutable network manager. + pub fn network_mut(&mut self) -> &mut NetworkManager { + &mut self.network + } + + /// Returns the handle to the network. + pub fn handle(&self) -> NetworkHandle { + self.network.handle().clone() + } + /// Consumes the type and returns all fields and also return a [`NetworkHandle`]. pub fn split_with_handle(self) -> (NetworkHandle, NetworkManager, Tx, Eth) { let NetworkBuilder { network, transactions, request_handler } = self; diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 7df8addb21..13d5702531 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -26,6 +26,7 @@ use crate::{ metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE}, network::{NetworkHandle, NetworkHandleMessage}, peers::{PeersHandle, PeersManager}, + protocol::IntoRlpxSubProtocol, session::SessionManager, state::NetworkState, swarm::{NetworkConnectionState, Swarm, SwarmEvent}, @@ -142,6 +143,11 @@ impl NetworkManager { self.to_eth_request_handler = Some(tx); } + /// Adds an additional protocol handler to the RLPx sub-protocol list. + pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) { + self.swarm.add_rlpx_sub_protocol(protocol) + } + /// Returns the [`NetworkHandle`] that can be cloned and shared. /// /// The [`NetworkHandle`] can be used to interact with this [`NetworkManager`] diff --git a/crates/net/network/src/protocol.rs b/crates/net/network/src/protocol.rs index 24dd686904..1ba6464def 100644 --- a/crates/net/network/src/protocol.rs +++ b/crates/net/network/src/protocol.rs @@ -116,6 +116,12 @@ where } } +impl IntoRlpxSubProtocol for RlpxSubProtocol { + fn into_rlpx_sub_protocol(self) -> RlpxSubProtocol { + self + } +} + /// Additional RLPx-based sub-protocols. #[derive(Debug, Default)] pub struct RlpxSubProtocols { diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index 0e6fdcd7fb..863964ac97 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -48,7 +48,7 @@ pub use handle::{ SessionCommand, }; -use crate::protocol::RlpxSubProtocols; +use crate::protocol::{IntoRlpxSubProtocol, RlpxSubProtocols}; pub use reth_network_api::{Direction, PeerInfo}; /// Internal identifier for active sessions. @@ -103,7 +103,6 @@ pub struct SessionManager { /// Receiver half that listens for [`ActiveSessionMessage`] produced by pending sessions. active_session_rx: ReceiverStream, /// Additional RLPx sub-protocols to be used by the session manager. - #[allow(unused)] extra_protocols: RlpxSubProtocols, /// Used to measure inbound & outbound bandwidth across all managed streams bandwidth_meter: BandwidthMeter, @@ -176,6 +175,11 @@ impl SessionManager { self.hello_message.clone() } + /// Adds an additional protocol handler to the RLPx sub-protocol list. + pub(crate) fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) { + self.extra_protocols.push(protocol) + } + /// Spawns the given future onto a new task that is tracked in the `spawned_tasks` /// [`JoinSet`](tokio::task::JoinSet). fn spawn(&self, f: F) diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs index 9f32efd168..ce647fe181 100644 --- a/crates/net/network/src/swarm.rs +++ b/crates/net/network/src/swarm.rs @@ -2,6 +2,7 @@ use crate::{ listener::{ConnectionListener, ListenerEvent}, message::{PeerMessage, PeerRequestSender}, peers::InboundConnectionError, + protocol::IntoRlpxSubProtocol, session::{Direction, PendingSessionHandshakeError, SessionEvent, SessionId, SessionManager}, state::{NetworkState, StateAction}, }; @@ -76,10 +77,7 @@ pub(crate) struct Swarm { // === impl Swarm === -impl Swarm -where - C: BlockNumReader, -{ +impl Swarm { /// Configures a new swarm instance. pub(crate) fn new( incoming: ConnectionListener, @@ -90,6 +88,11 @@ where Self { incoming, sessions, state, net_connection_state } } + /// Adds an additional protocol handler to the RLPx sub-protocol list. + pub(crate) fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) { + self.sessions_mut().add_rlpx_sub_protocol(protocol); + } + /// Access to the state. pub(crate) fn state(&self) -> &NetworkState { &self.state @@ -114,7 +117,12 @@ where pub(crate) fn sessions_mut(&mut self) -> &mut SessionManager { &mut self.sessions } +} +impl Swarm +where + C: BlockNumReader, +{ /// Triggers a new outgoing connection to the given node pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_id: PeerId) { self.sessions.dial_outbound(remote_addr, remote_id)