From 12e7f0acbcbff054b5c7db68ee54acf7313e18ff Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 14 Dec 2022 11:39:09 +0100 Subject: [PATCH] feat(net): make Status and Hello configurable (#431) * feat(net): make Status and Hello configurable * set port --- crates/net/eth-wire/src/hello.rs | 86 +++++++++++++++++++++++-- crates/net/eth-wire/src/types/status.rs | 5 +- crates/net/network/src/config.rs | 81 +++++++++++++++++++---- crates/net/network/src/lib.rs | 2 +- crates/net/network/src/manager.rs | 5 +- crates/net/network/src/session/mod.rs | 31 ++++----- crates/net/network/tests/it/connect.rs | 30 ++++++++- 7 files changed, 199 insertions(+), 41 deletions(-) diff --git a/crates/net/eth-wire/src/hello.rs b/crates/net/eth-wire/src/hello.rs index a7133d2347..1b29ce12fb 100644 --- a/crates/net/eth-wire/src/hello.rs +++ b/crates/net/eth-wire/src/hello.rs @@ -1,14 +1,15 @@ -use crate::{capability::Capability, ProtocolVersion}; +use crate::{capability::Capability, EthVersion, ProtocolVersion}; use reth_primitives::PeerId; use reth_rlp::{RlpDecodable, RlpEncodable}; use serde::{Deserialize, Serialize}; +/// The client version: `reth/v{major}.{minor}.{patch}` +pub(crate) const DEFAULT_CLIENT_VERSION: &str = concat!("reth/v", env!("CARGO_PKG_VERSION")); + // TODO: determine if we should allow for the extra fields at the end like EIP-706 suggests /// Message used in the `p2p` handshake, containing information about the supported RLPx protocol /// version and capabilities. -#[derive( - Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable, Serialize, Deserialize, Default, -)] +#[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable, Serialize, Deserialize)] pub struct HelloMessage { /// The version of the `p2p` protocol. pub protocol_version: ProtocolVersion, @@ -23,6 +24,83 @@ pub struct HelloMessage { pub id: PeerId, } +// === impl HelloMessage === + +impl HelloMessage { + /// Starts a new [`HelloMessageBuilder`] + /// + /// ``` + /// use secp256k1::{SECP256K1, SecretKey}; + /// use reth_ecies::util::pk2id; + /// use reth_eth_wire::HelloMessage; + /// let secret_key = SecretKey::new(&mut rand::thread_rng()); + /// let id = pk2id(&secret_key.public_key(SECP256K1)); + /// let status = HelloMessage::builder(id).build(); + /// ``` + pub fn builder(id: PeerId) -> HelloMessageBuilder { + HelloMessageBuilder::new(id) + } +} + +pub struct HelloMessageBuilder { + /// The version of the `p2p` protocol. + pub protocol_version: Option, + /// Specifies the client software identity, as a human-readable string (e.g. + /// "Ethereum(++)/1.0.0"). + pub client_version: Option, + /// The list of supported capabilities and their versions. + pub capabilities: Option>, + /// The port that the client is listening on, zero indicates the client is not listening. + pub port: Option, + /// The secp256k1 public key corresponding to the node's private key. + pub id: PeerId, +} + +// === impl HelloMessageBuilder === + +impl HelloMessageBuilder { + /// Create a new builder to configure a [`HelloMessage`] + pub fn new(id: PeerId) -> Self { + Self { protocol_version: None, client_version: None, capabilities: None, port: None, id } + } + + /// Sets the port the client is listening on + pub fn port(mut self, port: u16) -> Self { + self.port = Some(port); + self + } + + /// Sets capabilities. + pub fn capabilities(mut self, capabilities: Vec) -> Self { + self.capabilities = Some(capabilities); + self + } + + /// Sets client version. + pub fn client_version(mut self, client_version: impl Into) -> Self { + self.client_version = Some(client_version.into()); + self + } + + /// Sets client version. + pub fn protocol_version(mut self, protocol_version: ProtocolVersion) -> Self { + self.protocol_version = Some(protocol_version); + self + } + + /// Consumes the type and returns the configured [`HelloMessage`] + pub fn build(self) -> HelloMessage { + let Self { protocol_version, client_version, capabilities, port, id } = self; + HelloMessage { + protocol_version: protocol_version.unwrap_or_default(), + client_version: client_version.unwrap_or_else(|| DEFAULT_CLIENT_VERSION.to_string()), + capabilities: capabilities.unwrap_or_else(|| vec![EthVersion::Eth67.into()]), + port: port.unwrap_or(30303), + id, + } + } +} + #[cfg(test)] mod tests { use reth_ecies::util::pk2id; diff --git a/crates/net/eth-wire/src/types/status.rs b/crates/net/eth-wire/src/types/status.rs index 4f605414ea..1b77477c09 100644 --- a/crates/net/eth-wire/src/types/status.rs +++ b/crates/net/eth-wire/src/types/status.rs @@ -91,15 +91,16 @@ impl Debug for Status { } } +// impl Default for Status { fn default() -> Self { Status { version: EthVersion::Eth67 as u8, chain: Chain::Named(ethers_core::types::Chain::Mainnet), - total_difficulty: U256::zero(), + total_difficulty: 17_179_869_184u64.into(), blockhash: MAINNET_GENESIS, genesis: MAINNET_GENESIS, - forkid: Hardfork::Homestead.fork_id(), + forkid: Hardfork::Latest.fork_id(), } } } diff --git a/crates/net/network/src/config.rs b/crates/net/network/src/config.rs index ee9c762cc2..a4e1d8f30e 100644 --- a/crates/net/network/src/config.rs +++ b/crates/net/network/src/config.rs @@ -6,9 +6,9 @@ use crate::{ session::SessionsConfig, }; use reth_discv4::{Discv4Config, Discv4ConfigBuilder, NodeRecord, DEFAULT_DISCOVERY_PORT}; -use reth_primitives::{Chain, ForkId, H256}; +use reth_primitives::{Chain, PeerId, H256}; use reth_tasks::TaskExecutor; -use secp256k1::SecretKey; +use secp256k1::{SecretKey, SECP256K1}; use std::{ net::{Ipv4Addr, SocketAddr, SocketAddrV4}, sync::Arc, @@ -21,6 +21,8 @@ mod __reexport { pub use secp256k1::SecretKey; } pub use __reexport::*; +use reth_ecies::util::pk2id; +use reth_eth_wire::{HelloMessage, Status}; /// Convenience function to create a new random [`SecretKey`] pub fn rng_secret_key() -> SecretKey { @@ -45,9 +47,6 @@ pub struct NetworkConfig { pub peers_config: PeersConfig, /// How to configure the [SessionManager](crate::session::SessionManager). pub sessions_config: SessionsConfig, - /// A fork identifier as defined by EIP-2124. - /// Serves as the chain compatibility identifier. - pub fork_id: Option, /// The id of the network pub chain: Chain, /// Genesis hash of the network @@ -58,6 +57,10 @@ pub struct NetworkConfig { pub network_mode: NetworkMode, /// The executor to use for spawning tasks. pub executor: Option, + /// The `Status` message to send to peers at the beginning. + pub status: Status, + /// Sets the hello message for the p2p handshake in RLPx + pub hello_message: HelloMessage, } // === impl NetworkConfig === @@ -105,9 +108,6 @@ pub struct NetworkConfigBuilder { peers_config: Option, /// How to configure the sessions manager sessions_config: Option, - /// A fork identifier as defined by EIP-2124. - /// Serves as the chain compatibility identifier. - fork_id: Option, /// The network's chain id chain: Chain, /// Network genesis hash @@ -118,6 +118,10 @@ pub struct NetworkConfigBuilder { network_mode: NetworkMode, /// The executor to use for spawning tasks. executor: Option, + /// The `Status` message to send to peers at the beginning. + status: Option, + /// Sets the hello message for the p2p handshake in RLPx + hello_message: Option, } // === impl NetworkConfigBuilder === @@ -134,15 +138,54 @@ impl NetworkConfigBuilder { listener_addr: None, peers_config: None, sessions_config: None, - fork_id: None, chain: Chain::Named(reth_primitives::rpc::Chain::Mainnet), genesis_hash: Default::default(), block_import: Box::::default(), network_mode: Default::default(), executor: None, + status: None, + hello_message: None, } } + /// Returns the configured [`PeerId`] + pub fn get_peer_id(&self) -> PeerId { + pk2id(&self.secret_key.public_key(SECP256K1)) + } + + /// Sets the `Status` message to send when connecting to peers. + /// + /// ``` + /// # use reth_eth_wire::Status; + /// # use reth_network::NetworkConfigBuilder; + /// # fn builder(builder: NetworkConfigBuilder) { + /// builder.status( + /// Status::builder().build() + /// ); + /// # } + /// ``` + pub fn status(mut self, status: Status) -> Self { + self.status = Some(status); + self + } + + /// Sets the `HelloMessage` to send when connecting to peers. + /// + /// ``` + /// # use reth_eth_wire::HelloMessage; + /// # use reth_network::NetworkConfigBuilder; + /// # fn builder(builder: NetworkConfigBuilder) { + /// let peer_id = builder.get_peer_id(); + /// builder.hello_message( + /// HelloMessage::builder(peer_id).build() + /// ); + /// # } + /// ``` + pub fn hello_message(mut self, hello_message: HelloMessage) -> Self { + self.hello_message = Some(hello_message); + self + } + /// set a custom peer config for how peers are handled pub fn peer_config(mut self, config: PeersConfig) -> Self { self.peers_config = Some(config); @@ -199,6 +242,7 @@ impl NetworkConfigBuilder { /// Consumes the type and creates the actual [`NetworkConfig`] pub fn build(self) -> NetworkConfig { + let peer_id = self.get_peer_id(); let Self { client, secret_key, @@ -208,13 +252,23 @@ impl NetworkConfigBuilder { listener_addr, peers_config, sessions_config, - fork_id, chain, genesis_hash, block_import, network_mode, executor, + status, + hello_message, } = self; + + let listener_addr = listener_addr.unwrap_or_else(|| { + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT)) + }); + + let mut hello_message = + hello_message.unwrap_or_else(|| HelloMessage::builder(peer_id).build()); + hello_message.port = listener_addr.port(); + NetworkConfig { client, secret_key, @@ -223,17 +277,16 @@ impl NetworkConfigBuilder { discovery_addr: discovery_addr.unwrap_or_else(|| { SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT)) }), - listener_addr: listener_addr.unwrap_or_else(|| { - SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT)) - }), + listener_addr, peers_config: peers_config.unwrap_or_default(), sessions_config: sessions_config.unwrap_or_default(), - fork_id, chain, genesis_hash, block_import, network_mode, executor, + status: status.unwrap_or_default(), + hello_message, } } } diff --git a/crates/net/network/src/lib.rs b/crates/net/network/src/lib.rs index 7043c3624e..3eeb6b9f98 100644 --- a/crates/net/network/src/lib.rs +++ b/crates/net/network/src/lib.rs @@ -97,7 +97,7 @@ mod swarm; pub mod transactions; pub use builder::NetworkBuilder; -pub use config::NetworkConfig; +pub use config::{NetworkConfig, NetworkConfigBuilder}; pub use fetch::FetchClient; pub use manager::{NetworkEvent, NetworkManager}; pub use message::PeerRequest; diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 09a3aa2750..2332acc2a2 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -145,6 +145,8 @@ where network_mode, boot_nodes, executor, + hello_message, + status, .. } = config; @@ -160,7 +162,8 @@ where // need to retrieve the addr here since provided port could be `0` let local_peer_id = discovery.local_id(); - let sessions = SessionManager::new(secret_key, sessions_config, executor); + let sessions = + SessionManager::new(secret_key, sessions_config, executor, status, hello_message); let state = NetworkState::new(client, discovery, peers_manger, genesis_hash); let swarm = Swarm::new(incoming, sessions, state); diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index 9942a5e0f1..29e6f6a686 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -5,6 +5,7 @@ use crate::{ message::PeerMessage, session::{ active::ActiveSession, + config::SessionCounter, handle::{ ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle, SessionCommand, @@ -17,11 +18,10 @@ use reth_ecies::stream::ECIESStream; use reth_eth_wire::{ capability::{Capabilities, CapabilityMessage}, error::EthStreamError, - DisconnectReason, HelloBuilder, HelloMessage, Status, StatusBuilder, UnauthedEthStream, - UnauthedP2PStream, + DisconnectReason, HelloMessage, Status, UnauthedEthStream, UnauthedP2PStream, }; use reth_primitives::{ForkFilter, Hardfork, PeerId}; -use secp256k1::{SecretKey, SECP256K1}; +use secp256k1::SecretKey; use std::{ collections::HashMap, future::Future, @@ -40,9 +40,8 @@ use tracing::{instrument, trace, warn}; mod active; mod config; mod handle; -use crate::session::config::SessionCounter; pub use config::SessionsConfig; -use reth_ecies::util::pk2id; + use reth_tasks::TaskExecutor; /// Internal identifier for active sessions. @@ -62,8 +61,8 @@ pub(crate) struct SessionManager { secret_key: SecretKey, /// The `Status` message to send to peers. status: Status, - /// THe `Hello` message to send to peers. - hello: HelloMessage, + /// THe `HelloMessage` message to send to peers. + hello_message: HelloMessage, /// The [`ForkFilter`] used to validate the peer's `Status` message. fork_filter: ForkFilter, /// Size of the command buffer per session. @@ -101,18 +100,14 @@ impl SessionManager { secret_key: SecretKey, config: SessionsConfig, executor: Option, + status: Status, + hello_message: HelloMessage, ) -> Self { let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(config.session_event_buffer); let (active_session_tx, active_session_rx) = mpsc::channel(config.session_event_buffer); - let pk = secret_key.public_key(SECP256K1); - let peer_id = pk2id(&pk); - - // TODO: make sure this is the right place to put these builders - maybe per-Network rather - // than per-Session? - let hello = HelloBuilder::new(peer_id).build(); - let status = StatusBuilder::default().build(); - let fork_filter = Hardfork::Frontier.fork_filter(); + let hardfork = Hardfork::from(status.forkid.next); + let fork_filter = hardfork.fork_filter(); Self { next_id: 0, @@ -120,7 +115,7 @@ impl SessionManager { request_timeout: config.request_timeout, secret_key, status, - hello, + hello_message, fork_filter, session_command_buffer: config.session_command_buffer, executor, @@ -180,7 +175,7 @@ impl SessionManager { pending_events, remote_addr, self.secret_key, - self.hello.clone(), + self.hello_message.clone(), self.status, self.fork_filter.clone(), )); @@ -204,7 +199,7 @@ impl SessionManager { remote_addr, remote_peer_id, self.secret_key, - self.hello.clone(), + self.hello_message.clone(), self.status, self.fork_filter.clone(), )); diff --git a/crates/net/network/tests/it/connect.rs b/crates/net/network/tests/it/connect.rs index 8de90b5c9c..58926eaf5e 100644 --- a/crates/net/network/tests/it/connect.rs +++ b/crates/net/network/tests/it/connect.rs @@ -87,6 +87,34 @@ async fn test_connect_with_boot_nodes() { } } +#[tokio::test(flavor = "multi_thread")] +#[ignore] +async fn test_connect_with_builder() { + reth_tracing::init_tracing(); + let secret_key = SecretKey::new(&mut rand::thread_rng()); + let mut discv4 = Discv4Config::builder(); + discv4.add_boot_nodes(mainnet_nodes()); + + let client = Arc::new(TestApi::default()); + let config = NetworkConfig::builder(Arc::clone(&client), secret_key).discovery(discv4).build(); + let (handle, network, _, requests) = NetworkManager::new(config) + .await + .unwrap() + .into_builder() + .request_handler(client) + .split_with_handle(); + + let mut events = handle.event_listener(); + + tokio::task::spawn(async move { + tokio::join!(network, requests); + }); + + while let Some(ev) = events.next().await { + dbg!(ev); + } +} + #[tokio::test(flavor = "multi_thread")] async fn test_incoming_node_id_blacklist() { reth_tracing::init_tracing(); @@ -200,7 +228,7 @@ async fn test_outgoing_connect_with_single_geth() { let geth_socket = SocketAddr::new([127, 0, 0, 1].into(), geth_p2p_port); let geth_endpoint = SocketAddr::new([127, 0, 0, 1].into(), geth.port()).to_string(); - let provider = Provider::::try_from(format!("http://{}", geth_endpoint)).unwrap(); + let provider = Provider::::try_from(format!("http://{geth_endpoint}")).unwrap(); // get the peer id we should be expecting let geth_peer_id: PeerId =