diff --git a/crates/net/network/src/builder.rs b/crates/net/network/src/builder.rs index b6e1dcf919..ea03a00a26 100644 --- a/crates/net/network/src/builder.rs +++ b/crates/net/network/src/builder.rs @@ -14,28 +14,28 @@ pub(crate) const ETH_REQUEST_CHANNEL_CAPACITY: usize = 256; /// A builder that can configure all components of the network. #[allow(missing_debug_implementations)] -pub struct NetworkBuilder { - pub(crate) network: NetworkManager, +pub struct NetworkBuilder { + pub(crate) network: NetworkManager, pub(crate) transactions: Tx, pub(crate) request_handler: Eth, } // === impl NetworkBuilder === -impl NetworkBuilder { +impl NetworkBuilder { /// Consumes the type and returns all fields. - pub fn split(self) -> (NetworkManager, Tx, Eth) { + pub fn split(self) -> (NetworkManager, Tx, Eth) { let Self { network, transactions, request_handler } = self; (network, transactions, request_handler) } /// Returns the network manager. - pub const fn network(&self) -> &NetworkManager { + pub const fn network(&self) -> &NetworkManager { &self.network } /// Returns the mutable network manager. - pub fn network_mut(&mut self) -> &mut NetworkManager { + pub fn network_mut(&mut self) -> &mut NetworkManager { &mut self.network } @@ -45,7 +45,7 @@ impl NetworkBuilder { } /// Consumes the type and returns all fields and also return a [`NetworkHandle`]. - pub fn split_with_handle(self) -> (NetworkHandle, NetworkManager, Tx, Eth) { + pub fn split_with_handle(self) -> (NetworkHandle, NetworkManager, Tx, Eth) { let Self { network, transactions, request_handler } = self; let handle = network.handle().clone(); (handle, network, transactions, request_handler) @@ -56,7 +56,7 @@ impl NetworkBuilder { self, pool: Pool, transactions_manager_config: TransactionsManagerConfig, - ) -> NetworkBuilder, Eth> { + ) -> NetworkBuilder, Eth> { let Self { mut network, request_handler, .. } = self; let (tx, rx) = mpsc::unbounded_channel(); network.set_transactions(tx); @@ -69,7 +69,7 @@ impl NetworkBuilder { pub fn request_handler( self, client: Client, - ) -> NetworkBuilder> { + ) -> NetworkBuilder> { let Self { mut network, transactions, .. } = self; let (tx, rx) = mpsc::channel(ETH_REQUEST_CHANNEL_CAPACITY); network.set_eth_request_handler(tx); diff --git a/crates/net/network/src/config.rs b/crates/net/network/src/config.rs index b197fc55f8..ed8834dd75 100644 --- a/crates/net/network/src/config.rs +++ b/crates/net/network/src/config.rs @@ -121,10 +121,10 @@ impl NetworkConfig { impl NetworkConfig where - C: BlockNumReader, + C: BlockNumReader + 'static, { /// Convenience method for calling [`NetworkManager::new`]. - pub async fn manager(self) -> Result, NetworkError> { + pub async fn manager(self) -> Result { NetworkManager::new(self).await } } @@ -136,8 +136,10 @@ where /// Starts the networking stack given a [`NetworkConfig`] and returns a handle to the network. pub async fn start_network(self) -> Result { let client = self.client.clone(); - let (handle, network, _txpool, eth) = - NetworkManager::builder(self).await?.request_handler(client).split_with_handle(); + let (handle, network, _txpool, eth) = NetworkManager::builder::(self) + .await? + .request_handler::(client) + .split_with_handle(); tokio::task::spawn(network); // TODO: tokio::task::spawn(txpool); diff --git a/crates/net/network/src/lib.rs b/crates/net/network/src/lib.rs index f14f6e8504..28dd6932bc 100644 --- a/crates/net/network/src/lib.rs +++ b/crates/net/network/src/lib.rs @@ -148,6 +148,7 @@ pub use session::{ PendingSessionHandle, PendingSessionHandshakeError, SessionCommand, SessionEvent, SessionId, SessionManager, }; + pub use transactions::{FilterAnnouncement, MessageFilter, ValidateTx68}; pub use reth_eth_wire::{DisconnectReason, HelloMessageWithProtocols}; diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 34cd75bd36..97d133116d 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -75,9 +75,9 @@ use tracing::{debug, error, trace, warn}; /// include_mmd!("docs/mermaid/network-manager.mmd") #[derive(Debug)] #[must_use = "The NetworkManager does nothing unless polled"] -pub struct NetworkManager { +pub struct NetworkManager { /// The type that manages the actual network part, which includes connections. - swarm: Swarm, + swarm: Swarm, /// Underlying network handle that can be shared. handle: NetworkHandle, /// Receiver half of the command channel set up between this type and the [`NetworkHandle`] @@ -115,7 +115,7 @@ pub struct NetworkManager { } // === impl NetworkManager === -impl NetworkManager { +impl NetworkManager { /// Sets the dedicated channel for events indented for the /// [`TransactionsManager`](crate::transactions::TransactionsManager). pub fn set_transactions(&mut self, tx: mpsc::UnboundedSender) { @@ -160,15 +160,14 @@ impl NetworkManager { } } -impl NetworkManager -where - C: BlockNumReader, -{ +impl NetworkManager { /// Creates the manager of a new network. /// /// The [`NetworkManager`] is an endless future that needs to be polled in order to advance the /// state of the entire network. - pub async fn new(config: NetworkConfig) -> Result { + pub async fn new( + config: NetworkConfig, + ) -> Result { let NetworkConfig { client, secret_key, @@ -241,8 +240,12 @@ where extra_protocols, ); - let state = - NetworkState::new(client, discovery, peers_manager, Arc::clone(&num_active_peers)); + let state = NetworkState::new( + crate::state::BlockNumReader::new(client), + discovery, + peers_manager, + Arc::clone(&num_active_peers), + ); let swarm = Swarm::new(incoming, sessions, state); @@ -306,15 +309,15 @@ where /// .split_with_handle(); /// } /// ``` - pub async fn builder( + pub async fn builder( config: NetworkConfig, - ) -> Result, NetworkError> { + ) -> Result, NetworkError> { let network = Self::new(config).await?; Ok(network.into_builder()) } /// Create a [`NetworkBuilder`] to configure all components of the network - pub const fn into_builder(self) -> NetworkBuilder { + pub const fn into_builder(self) -> NetworkBuilder<(), ()> { NetworkBuilder { network: self, transactions: (), request_handler: () } } @@ -940,10 +943,7 @@ where } } -impl NetworkManager -where - C: BlockNumReader + Unpin, -{ +impl NetworkManager { /// Drives the [`NetworkManager`] future until a [`GracefulShutdown`] signal is received. /// /// This invokes the given function `shutdown_hook` while holding the graceful shutdown guard. @@ -969,10 +969,7 @@ where } } -impl Future for NetworkManager -where - C: BlockNumReader + Unpin, -{ +impl Future for NetworkManager { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index 3dce15cfca..39802b4265 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -20,10 +20,11 @@ use reth_eth_wire::{ use reth_network_api::PeerKind; use reth_network_peers::PeerId; use reth_primitives::{ForkId, B256}; -use reth_storage_api::BlockNumReader; use std::{ collections::{HashMap, VecDeque}, + fmt, net::{IpAddr, SocketAddr}, + ops::Deref, sync::{ atomic::{AtomicU64, AtomicUsize}, Arc, @@ -36,6 +37,30 @@ use tracing::{debug, trace}; /// Cache limit of blocks to keep track of for a single peer. const PEER_BLOCK_CACHE_LIMIT: u32 = 512; +/// Wrapper type for the [`BlockNumReader`] trait. +pub(crate) struct BlockNumReader(Box); + +impl BlockNumReader { + /// Create a new instance with the given reader. + pub fn new(reader: impl reth_storage_api::BlockNumReader + 'static) -> Self { + Self(Box::new(reader)) + } +} + +impl fmt::Debug for BlockNumReader { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BlockNumReader").field("inner", &"").finish() + } +} + +impl Deref for BlockNumReader { + type Target = Box; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + /// The [`NetworkState`] keeps track of the state of all peers in the network. /// /// This includes: @@ -47,7 +72,7 @@ const PEER_BLOCK_CACHE_LIMIT: u32 = 512; /// /// This type is also responsible for responding for received request. #[derive(Debug)] -pub struct NetworkState { +pub struct NetworkState { /// All active peers and their state. active_peers: HashMap, /// Manages connections to peers. @@ -58,7 +83,7 @@ pub struct NetworkState { /// /// This type is used to fetch the block number after we established a session and received the /// [Status] block hash. - client: C, + client: BlockNumReader, /// Network discovery. discovery: Discovery, /// The type that handles requests. @@ -69,13 +94,10 @@ pub struct NetworkState { state_fetcher: StateFetcher, } -impl NetworkState -where - C: BlockNumReader, -{ +impl NetworkState { /// Create a new state instance with the given params pub(crate) fn new( - client: C, + client: BlockNumReader, discovery: Discovery, peers_manager: PeersManager, num_active_peers: Arc, @@ -523,8 +545,12 @@ pub(crate) enum StateAction { #[cfg(test)] mod tests { use crate::{ - discovery::Discovery, fetch::StateFetcher, message::PeerRequestSender, peers::PeersManager, - state::NetworkState, PeerRequest, + discovery::Discovery, + fetch::StateFetcher, + message::PeerRequestSender, + peers::PeersManager, + state::{BlockNumReader, NetworkState}, + PeerRequest, }; use reth_eth_wire::{ capability::{Capabilities, Capability}, @@ -542,14 +568,14 @@ mod tests { use tokio_stream::{wrappers::ReceiverStream, StreamExt}; /// Returns a testing instance of the [`NetworkState`]. - fn state() -> NetworkState { + fn state() -> NetworkState { let peers = PeersManager::default(); let handle = peers.handle(); NetworkState { active_peers: Default::default(), peers_manager: Default::default(), queued_messages: Default::default(), - client: NoopProvider::default(), + client: BlockNumReader(Box::new(NoopProvider::default())), discovery: Discovery::noop(), state_fetcher: StateFetcher::new(handle, Default::default()), } diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs index faf39f8390..83d26a56b2 100644 --- a/crates/net/network/src/swarm.rs +++ b/crates/net/network/src/swarm.rs @@ -13,7 +13,6 @@ use reth_eth_wire::{ EthVersion, Status, }; use reth_network_peers::PeerId; -use reth_storage_api::BlockNumReader; use std::{ io, net::SocketAddr, @@ -50,23 +49,23 @@ use tracing::trace; /// `include_mmd!("docs/mermaid/swarm.mmd`") #[derive(Debug)] #[must_use = "Swarm does nothing unless polled"] -pub(crate) struct Swarm { +pub(crate) struct Swarm { /// Listens for new incoming connections. incoming: ConnectionListener, /// All sessions. sessions: SessionManager, /// Tracks the entire state of the network and handles events received from the sessions. - state: NetworkState, + state: NetworkState, } // === impl Swarm === -impl Swarm { +impl Swarm { /// Configures a new swarm instance. pub(crate) const fn new( incoming: ConnectionListener, sessions: SessionManager, - state: NetworkState, + state: NetworkState, ) -> Self { Self { incoming, sessions, state } } @@ -77,12 +76,12 @@ impl Swarm { } /// Access to the state. - pub(crate) const fn state(&self) -> &NetworkState { + pub(crate) const fn state(&self) -> &NetworkState { &self.state } /// Mutable access to the state. - pub(crate) fn state_mut(&mut self) -> &mut NetworkState { + pub(crate) fn state_mut(&mut self) -> &mut NetworkState { &mut self.state } @@ -102,10 +101,7 @@ impl Swarm { } } -impl Swarm -where - C: BlockNumReader, -{ +impl Swarm { /// Triggers a new outgoing connection to the given node pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_id: PeerId) { self.sessions.dial_outbound(remote_addr, remote_id) @@ -285,10 +281,7 @@ where } } -impl Stream for Swarm -where - C: BlockNumReader + Unpin, -{ +impl Stream for Swarm { type Item = SwarmEvent; /// This advances all components. diff --git a/crates/net/network/src/test_utils/testnet.rs b/crates/net/network/src/test_utils/testnet.rs index 5de45b2298..1fa0603f8e 100644 --- a/crates/net/network/src/test_utils/testnet.rs +++ b/crates/net/network/src/test_utils/testnet.rs @@ -51,7 +51,7 @@ pub struct Testnet { impl Testnet where - C: BlockReader + HeaderProvider + Clone, + C: BlockReader + HeaderProvider + Clone + 'static, { /// Same as [`Self::try_create_with`] but panics on error pub async fn create_with(num_peers: usize, provider: C) -> Self { @@ -85,7 +85,7 @@ where impl Testnet where - C: BlockReader + HeaderProvider + Clone, + C: BlockReader + HeaderProvider + Clone + 'static, Pool: TransactionPool, { /// Return a mutable slice of all peers. @@ -252,7 +252,7 @@ impl fmt::Debug for Testnet { impl Future for Testnet where - C: BlockReader + HeaderProvider + Unpin, + C: BlockReader + HeaderProvider + Unpin + 'static, Pool: TransactionPool + Unpin + 'static, { type Output = (); @@ -326,7 +326,7 @@ impl TestnetHandle { #[derive(Debug)] pub struct Peer { #[pin] - network: NetworkManager, + network: NetworkManager, #[pin] request_handler: Option>, #[pin] @@ -340,7 +340,7 @@ pub struct Peer { impl Peer where - C: BlockReader + HeaderProvider + Clone, + C: BlockReader + HeaderProvider + Clone + 'static, Pool: TransactionPool, { /// Returns the number of connected peers. @@ -373,7 +373,7 @@ where } /// Returns mutable access to the network. - pub fn network_mut(&mut self) -> &mut NetworkManager { + pub fn network_mut(&mut self) -> &mut NetworkManager { &mut self.network } @@ -437,7 +437,7 @@ where impl Peer where - C: BlockReader + HeaderProvider + Clone, + C: BlockReader + HeaderProvider + Clone + 'static, { /// Installs a new [`TestPool`] pub fn install_test_pool(&mut self) { @@ -447,7 +447,7 @@ where impl Future for Peer where - C: BlockReader + HeaderProvider + Unpin, + C: BlockReader + HeaderProvider + Unpin + 'static, Pool: TransactionPool + Unpin + 'static, { type Output = (); @@ -526,7 +526,7 @@ impl PeerHandle { impl PeerConfig where - C: BlockReader + HeaderProvider + Clone, + C: BlockReader + HeaderProvider + Clone + 'static, { /// Launches the network and returns the [Peer] that manages it pub async fn launch(self) -> Result, NetworkError> { diff --git a/crates/net/network/tests/it/connect.rs b/crates/net/network/tests/it/connect.rs index b379a67044..88a0689b33 100644 --- a/crates/net/network/tests/it/connect.rs +++ b/crates/net/network/tests/it/connect.rs @@ -674,7 +674,7 @@ async fn test_rejected_by_already_connect() { async fn new_random_peer( max_in_bound: usize, trusted_nodes: HashSet, -) -> NetworkManager { +) -> NetworkManager { let secret_key = SecretKey::new(&mut rand::thread_rng()); let peers_config = PeersConfig::default().with_max_inbound(max_in_bound).with_trusted_nodes(trusted_nodes); @@ -683,7 +683,7 @@ async fn new_random_peer( .listener_port(0) .disable_discovery() .peer_config(peers_config) - .build(NoopProvider::default()); + .build_with_noop_provider(); NetworkManager::new(config).await.unwrap() } diff --git a/crates/node/builder/src/builder/mod.rs b/crates/node/builder/src/builder/mod.rs index d46b73d768..a6b723d3e5 100644 --- a/crates/node/builder/src/builder/mod.rs +++ b/crates/node/builder/src/builder/mod.rs @@ -478,7 +478,7 @@ impl BuilderContext { } /// Creates the [`NetworkBuilder`] for the node. - pub async fn network_builder(&self) -> eyre::Result> { + pub async fn network_builder(&self) -> eyre::Result> { let network_config = self.network_config()?; let builder = NetworkManager::builder(network_config).await?; Ok(builder) @@ -488,11 +488,7 @@ impl BuilderContext { /// /// Spawns the configured network and associated tasks and returns the [`NetworkHandle`] /// connected to that network. - pub fn start_network( - &self, - builder: NetworkBuilder, - pool: Pool, - ) -> NetworkHandle + pub fn start_network(&self, builder: NetworkBuilder<(), ()>, pool: Pool) -> NetworkHandle where Pool: TransactionPool + Unpin + 'static, { diff --git a/crates/node/core/src/cli/config.rs b/crates/node/core/src/cli/config.rs index e3ea46bb48..f40d4287f4 100644 --- a/crates/node/core/src/cli/config.rs +++ b/crates/node/core/src/cli/config.rs @@ -49,7 +49,7 @@ pub trait RethNetworkConfig { // TODO add more network config methods here } -impl RethNetworkConfig for reth_network::NetworkManager { +impl RethNetworkConfig for reth_network::NetworkManager { fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) { Self::add_rlpx_sub_protocol(self, protocol); }