From 0f45f16455d77c904b263b02514291aa2283a974 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 23 Nov 2022 01:33:01 +0100 Subject: [PATCH] test(net): add Testnet implementation for testing (#232) * test(net): add Testnet implementation for testing * test(net): add testnet type and test --- crates/net/network/Cargo.toml | 4 + crates/net/network/src/config.rs | 12 ++ crates/net/network/src/lib.rs | 2 +- crates/net/network/src/manager.rs | 50 ++++-- crates/net/network/src/network.rs | 42 ++++- crates/net/network/src/session/active.rs | 4 +- crates/net/network/src/session/mod.rs | 12 +- crates/net/network/src/state.rs | 14 +- crates/net/network/src/swarm.rs | 9 + crates/net/network/src/transactions.rs | 2 +- crates/net/network/tests/it/connect.rs | 48 +++++ crates/net/network/tests/it/main.rs | 5 + crates/net/network/tests/it/testnet.rs | 213 +++++++++++++++++++++++ 13 files changed, 388 insertions(+), 29 deletions(-) create mode 100644 crates/net/network/tests/it/connect.rs create mode 100644 crates/net/network/tests/it/main.rs create mode 100644 crates/net/network/tests/it/testnet.rs diff --git a/crates/net/network/Cargo.toml b/crates/net/network/Cargo.toml index 7a1a7043ab..396c6421e9 100644 --- a/crates/net/network/Cargo.toml +++ b/crates/net/network/Cargo.toml @@ -44,4 +44,8 @@ secp256k1 = { version = "0.24", features = [ ] } [dev-dependencies] +# reth +reth-interfaces = { path = "../../interfaces", features = ["test-utils"] } + rand = "0.8" + diff --git a/crates/net/network/src/config.rs b/crates/net/network/src/config.rs index e7be93e750..c7db2f4b9b 100644 --- a/crates/net/network/src/config.rs +++ b/crates/net/network/src/config.rs @@ -129,6 +129,18 @@ impl NetworkConfigBuilder { self } + /// Sets the socket address the network will listen on + pub fn listener_addr(mut self, listener_addr: SocketAddr) -> Self { + self.listener_addr = Some(listener_addr); + self + } + + /// Sets the socket address the discovery network will listen on + pub fn discovery_addr(mut self, discovery_addr: SocketAddr) -> Self { + self.discovery_addr = Some(discovery_addr); + self + } + /// Consumes the type and creates the actual [`NetworkConfig`] pub fn build(self) -> NetworkConfig { let Self { diff --git a/crates/net/network/src/lib.rs b/crates/net/network/src/lib.rs index 831f1a897d..a2a908142e 100644 --- a/crates/net/network/src/lib.rs +++ b/crates/net/network/src/lib.rs @@ -32,6 +32,6 @@ mod swarm; mod transactions; pub use config::NetworkConfig; -pub use manager::NetworkManager; +pub use manager::{NetworkEvent, NetworkManager}; pub use network::NetworkHandle; pub use peers::PeersConfig; diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index fd0cba7ab0..23858cf2e0 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -93,8 +93,6 @@ pub struct NetworkManager { /// This is updated via internal events and shared via `Arc` with the [`NetworkHandle`] /// Updated by the `NetworkWorker` and loaded by the `NetworkService`. num_active_peers: Arc, - /// Local copy of the `PeerId` of the local node. - local_peer_id: PeerId, } // === impl NetworkManager === @@ -158,7 +156,6 @@ where event_listeners: Default::default(), to_transactions: None, num_active_peers, - local_peer_id, }) } @@ -167,6 +164,21 @@ where self.to_transactions = Some(tx); } + /// Returns the [`SocketAddr`] that listens for incoming connections. + pub fn local_addr(&self) -> SocketAddr { + self.swarm.listener().local_address() + } + + /// How many peers we're currently connected to. + pub fn num_connected_peers(&self) -> usize { + self.swarm.state().num_connected_peers() + } + + /// Returns the [`PeerId`] used in the network. + pub fn peer_id(&self) -> &PeerId { + self.handle.peer_id() + } + /// Returns the [`NetworkHandle`] that can be cloned and shared. /// /// The [`NetworkHandle`] can be used to interact with this [`NetworkManager`] @@ -181,7 +193,7 @@ where _capabilities: Arc, _message: CapabilityMessage, ) { - trace!(?peer_id, target = "net", "received unexpected message"); + trace!(target : "net", ?peer_id, "received unexpected message"); // TODO: disconnect? } @@ -292,7 +304,7 @@ where } NetworkHandleMessage::AnnounceBlock(block, hash) => { if self.handle.mode().is_stake() { - error!(target = "net", "Block propagation is not supported in POS - [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p)"); + error!(target : "net", "Block propagation is not supported in POS - [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p)"); return } let msg = NewBlockMessage { hash, block: Arc::new(block) }; @@ -308,6 +320,12 @@ where .swarm .sessions_mut() .send_message(&peer_id, PeerMessage::PooledTransactions(msg)), + NetworkHandleMessage::AddPeerAddress(peer, addr) => { + self.swarm.state_mut().add_peer_address(peer, addr); + } + NetworkHandleMessage::DisconnectPeer(peer_id) => { + self.swarm.sessions_mut().disconnect(peer_id, None); + } } } } @@ -351,24 +369,24 @@ where this.on_invalid_message(peer_id, capabilities, message) } SwarmEvent::TcpListenerClosed { remote_addr } => { - trace!(?remote_addr, target = "net", "TCP listener closed."); + trace!(target : "net", ?remote_addr, "TCP listener closed."); } SwarmEvent::TcpListenerError(err) => { - trace!(?err, target = "net", "TCP connection error."); + trace!(target : "net", ?err, "TCP connection error."); } SwarmEvent::IncomingTcpConnection { remote_addr, .. } => { - trace!(?remote_addr, target = "net", "Incoming connection"); + trace!(target : "net",?remote_addr, "Incoming connection"); } SwarmEvent::OutgoingTcpConnection { remote_addr } => { - trace!(?remote_addr, target = "net", "Starting outbound connection."); + trace!(target : "net", ?remote_addr,"Starting outbound connection."); } SwarmEvent::SessionEstablished { peer_id, remote_addr, capabilities, messages } => { let total_active = this.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1; trace!( + target : "net", ?remote_addr, ?peer_id, ?total_active, - target = "net", "Session established" ); @@ -381,11 +399,11 @@ where SwarmEvent::SessionClosed { peer_id, remote_addr, error } => { let total_active = this.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1; trace!( + target : "net", ?remote_addr, ?peer_id, ?total_active, ?error, - target = "net", "Session disconnected" ); @@ -408,11 +426,17 @@ where #[derive(Debug, Clone)] pub enum NetworkEvent { /// Closed the peer session. - SessionClosed { peer_id: PeerId }, + SessionClosed { + /// The identifier of the peer to which a session was closed. + peer_id: PeerId, + }, /// Established a new session with the given peer. SessionEstablished { + /// The identifier of the peer to which a session was established. peer_id: PeerId, + /// Capabilities the peer announced capabilities: Arc, + /// A request channel to the session task. messages: PeerRequestSender, }, } @@ -434,7 +458,7 @@ impl NetworkEventListeners { self.listeners.retain(|listener| { let open = listener.send(event.clone()).is_ok(); if !open { - trace!(target = "net", "event listener channel closed",); + trace!(target : "net", "event listener channel closed",); } open }); diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index 503d3e9678..e5487f1940 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -4,9 +4,13 @@ use reth_eth_wire::{NewBlock, NewPooledTransactionHashes, Transactions}; use reth_primitives::{PeerId, H256}; use std::{ net::SocketAddr, - sync::{atomic::AtomicUsize, Arc}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, }; use tokio::sync::{mpsc, mpsc::UnboundedSender}; +use tokio_stream::wrappers::UnboundedReceiverStream; /// A _shareable_ network frontend. Used to interact with the network. /// @@ -40,15 +44,30 @@ impl NetworkHandle { Self { inner: Arc::new(inner) } } + /// How many peers we're currently connected to. + pub fn num_connected_peers(&self) -> usize { + self.inner.num_active_peers.load(Ordering::Relaxed) + } + + /// Returns the [`SocketAddr`] that listens for incoming connections. + pub fn local_addr(&self) -> SocketAddr { + *self.inner.listener_address.lock() + } + + /// Returns the [`PeerId`] used in the network. + pub fn peer_id(&self) -> &PeerId { + &self.inner.local_peer_id + } + fn manager(&self) -> &UnboundedSender { &self.inner.to_manager_tx } /// Creates a new [`NetworkEvent`] listener channel. - pub fn event_listener(&self) -> mpsc::UnboundedReceiver { + pub fn event_listener(&self) -> UnboundedReceiverStream { let (tx, rx) = mpsc::unbounded_channel(); let _ = self.manager().send(NetworkHandleMessage::EventListener(tx)); - rx + UnboundedReceiverStream::new(rx) } /// Returns the mode of the network, either pow, or pos @@ -61,8 +80,19 @@ impl NetworkHandle { let _ = self.inner.to_manager_tx.send(msg); } + /// Sends a message to the [`NetworkManager`] to add a peer to the known set + pub fn add_peer(&self, peer: PeerId, addr: SocketAddr) { + let _ = self.inner.to_manager_tx.send(NetworkHandleMessage::AddPeerAddress(peer, addr)); + } + + /// Sends a message to the [`NetworkManager`] to disconnect an existing connection to the given + /// peer. + pub fn disconnect_peer(&self, peer: PeerId) { + let _ = self.inner.to_manager_tx.send(NetworkHandleMessage::DisconnectPeer(peer)); + } + /// Sends a [`PeerRequest`] to the given peer's session. - pub fn send_request(&mut self, peer_id: PeerId, request: PeerRequest) { + pub fn send_request(&self, peer_id: PeerId, request: PeerRequest) { self.send_message(NetworkHandleMessage::EthRequest { peer_id, request }) } } @@ -85,6 +115,10 @@ struct NetworkInner { /// Internal messages that can be passed to the [`NetworkManager`](crate::NetworkManager). #[allow(missing_docs)] pub(crate) enum NetworkHandleMessage { + /// Adds an address for a peer. + AddPeerAddress(PeerId, SocketAddr), + /// Disconnect a connection to a peer if it exists. + DisconnectPeer(PeerId), /// Add a new listener for [`NetworkEvent`]. EventListener(UnboundedSender), /// Broadcast event to announce a new block to all nodes. diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index f5cf010de9..810fc4a703 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -216,7 +216,7 @@ impl ActiveSession { self.queued_outgoing.push_back(msg.into()); } Err(err) => { - error!(?err, target = "net", "Failed to respond to received request"); + error!(target : "net", ?err, "Failed to respond to received request"); } } } @@ -225,8 +225,8 @@ impl ActiveSession { fn emit_message(&self, message: PeerMessage) { let _ = self.try_emit_message(message).map_err(|err| { warn!( + target : "net", %err, - target = "net", "dropping incoming message", ); }); diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index 6af8d9f89d..55f1e561d4 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -220,8 +220,8 @@ impl SessionManager { return match event { ActiveSessionMessage::Disconnected { peer_id, remote_addr } => { trace!( + target : "net::session", ?peer_id, - target = "net::session", "gracefully disconnected active session." ); let _ = self.active_sessions.remove(&peer_id); @@ -232,7 +232,7 @@ impl SessionManager { remote_addr, error, } => { - trace!(?peer_id, ?error, target = "net::session", "closed session."); + trace!(target : "net::session", ?peer_id, ?error,"closed session."); let _ = self.active_sessions.remove(&peer_id); Poll::Ready(SessionEvent::SessionClosedOnConnectionError { remote_addr, @@ -260,9 +260,9 @@ impl SessionManager { match event { PendingSessionEvent::SuccessfulHandshake { remote_addr, session_id } => { trace!( + target : "net::session", ?session_id, ?remote_addr, - target = "net::session", "successful handshake" ); } @@ -321,9 +321,9 @@ impl SessionManager { } PendingSessionEvent::Disconnected { remote_addr, session_id, direction, error } => { trace!( + target : "net::session", ?session_id, ?remote_addr, - target = "net::session", "disconnected pending session" ); let _ = self.pending_sessions.remove(&session_id); @@ -350,11 +350,11 @@ impl SessionManager { error, } => { trace!( + target : "net::session", ?error, ?session_id, ?remote_addr, ?peer_id, - target = "net::session", "connection refused" ); let _ = self.pending_sessions.remove(&session_id); @@ -366,10 +366,10 @@ impl SessionManager { PendingSessionEvent::EciesAuthError { remote_addr, session_id, error } => { let _ = self.pending_sessions.remove(&session_id); warn!( + target : "net::session", ?error, ?session_id, ?remote_addr, - target = "net::session", "ecies auth failed" ); let _ = self.pending_sessions.remove(&session_id); diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index 8b3f7a6dde..ce317de360 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -80,6 +80,11 @@ where } } + /// How many peers we're currently connected to. + pub fn num_connected_peers(&self) -> usize { + self.connected_peers.len() + } + /// Event hook for an activated session for the peer. /// /// Returns `Ok` if the session is valid, returns an `Err` if the session is not accepted and @@ -92,7 +97,7 @@ where request_tx: PeerRequestSender, ) -> Result<(), AddSessionError> { // TODO add capacity check - debug_assert!(self.connected_peers.contains_key(&peer), "Already connected; not possible"); + debug_assert!(!self.connected_peers.contains_key(&peer), "Already connected; not possible"); // find the corresponding block number let block_number = @@ -210,6 +215,11 @@ where } } + /// Adds a peer and its address to the peerset. + pub(crate) fn add_peer_address(&mut self, peer_id: PeerId, addr: SocketAddr) { + self.peers_manager.add_discovered_node(peer_id, addr) + } + /// Event hook for events received from the discovery service. fn on_discovery_event(&mut self, event: DiscoveryEvent) { match event { @@ -318,8 +328,8 @@ where match response.poll(cx) { Poll::Ready(Err(_)) => { trace!( + target : "net", ?id, - target = "net", "Request canceled, response channel closed." ); disconnect_sessions.push(*id); diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs index e3444b672a..401ff16792 100644 --- a/crates/net/network/src/swarm.rs +++ b/crates/net/network/src/swarm.rs @@ -53,11 +53,20 @@ where Self { incoming, sessions, state } } + /// Access to the state. + pub(crate) fn state(&self) -> &NetworkState { + &self.state + } + /// Mutable access to the state. pub(crate) fn state_mut(&mut self) -> &mut NetworkState { &mut self.state } + /// Access to the [`ConnectionListener`]. + pub(crate) fn listener(&self) -> &ConnectionListener { + &self.incoming + } /// Mutable access to the [`SessionManager`]. pub(crate) fn sessions_mut(&mut self) -> &mut SessionManager { &mut self.sessions diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index b08ba1b058..3ddcd51ef0 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -110,7 +110,7 @@ where Self { pool, network, - network_events: UnboundedReceiverStream::new(network_events), + network_events, inflight_requests: Default::default(), transactions_by_peers: Default::default(), pool_imports: Default::default(), diff --git a/crates/net/network/tests/it/connect.rs b/crates/net/network/tests/it/connect.rs new file mode 100644 index 0000000000..bb63e3c84c --- /dev/null +++ b/crates/net/network/tests/it/connect.rs @@ -0,0 +1,48 @@ +//! Connection tests + +use super::testnet::Testnet; +use futures::StreamExt; +use reth_network::NetworkEvent; +use std::collections::HashSet; + +#[tokio::test(flavor = "multi_thread")] +async fn test_establish_connections() { + let net = Testnet::create(3).await; + + net.for_each(|peer| assert_eq!(0, peer.num_peers())); + + let mut handles = net.handles(); + let handle0 = handles.next().unwrap(); + let handle1 = handles.next().unwrap(); + let handle2 = handles.next().unwrap(); + + drop(handles); + let handle = net.spawn(); + + let listener0 = handle0.event_listener(); + + handle0.add_peer(*handle1.peer_id(), handle1.local_addr()); + handle0.add_peer(*handle2.peer_id(), handle2.local_addr()); + + let mut expected_connections = HashSet::from([*handle1.peer_id(), *handle2.peer_id()]); + + let mut established = listener0.take(2); + while let Some(ev) = established.next().await { + match ev { + NetworkEvent::SessionClosed { .. } => { + panic!("unexpected event") + } + NetworkEvent::SessionEstablished { peer_id, .. } => { + assert!(expected_connections.remove(&peer_id)) + } + } + } + + assert!(expected_connections.is_empty()); + + let net = handle.terminate().await; + + net.for_each(|peer| { + assert!(peer.num_peers() >= 1); + }); +} diff --git a/crates/net/network/tests/it/main.rs b/crates/net/network/tests/it/main.rs new file mode 100644 index 0000000000..da8117f5a8 --- /dev/null +++ b/crates/net/network/tests/it/main.rs @@ -0,0 +1,5 @@ +mod connect; +mod testnet; +pub use testnet::*; + +fn main() {} diff --git a/crates/net/network/tests/it/testnet.rs b/crates/net/network/tests/it/testnet.rs new file mode 100644 index 0000000000..54af83682f --- /dev/null +++ b/crates/net/network/tests/it/testnet.rs @@ -0,0 +1,213 @@ +//! A network implementation for testing purposes. + +use futures::FutureExt; +use pin_project::pin_project; +use reth_interfaces::{provider::BlockProvider, test_utils::TestApi}; +use reth_network::{error::NetworkError, NetworkConfig, NetworkHandle, NetworkManager}; +use secp256k1::SecretKey; +use std::{ + fmt, + future::Future, + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; +use tokio::{sync::oneshot, task::JoinHandle}; + +/// A test network consisting of multiple peers. +#[derive(Default)] +pub struct Testnet { + /// All running peers in the network. + peers: Vec>, +} + +// === impl Testnet === + +impl Testnet +where + C: BlockProvider, +{ + pub fn peers_mut(&mut self) -> &mut [Peer] { + &mut self.peers + } + + pub fn peers(&self) -> &[Peer] { + &self.peers + } + + pub fn peers_iter_mut(&mut self) -> impl Iterator> + '_ { + self.peers.iter_mut() + } + + pub fn peers_iter(&self) -> impl Iterator> + '_ { + self.peers.iter() + } + + pub async fn add_peer_with_config( + &mut self, + config: PeerConfig, + ) -> Result<(), NetworkError> { + let PeerConfig { config, client, secret_key } = config; + + let network = NetworkManager::new(config).await?; + let peer = Peer { network, client, secret_key }; + self.peers.push(peer); + Ok(()) + } + + /// Returns all handles to the networks + pub fn handles(&self) -> impl Iterator + '_ { + self.peers.iter().map(|p| p.handle()) + } + + /// Apply a closure on each peer + pub fn for_each(&self, f: F) + where + F: Fn(&Peer), + { + self.peers.iter().for_each(f) + } +} + +impl Testnet +where + C: BlockProvider + 'static, +{ + /// Spawns the testnet to a separate task + pub fn spawn(self) -> TestnetHandle { + let (tx, rx) = oneshot::channel::>(); + let mut net = self; + let handle = tokio::task::spawn(async move { + let mut tx = None; + loop { + tokio::select! { + _ = &mut net => { break} + inc = rx => { + tx = inc.ok(); + break + } + } + } + if let Some(tx) = tx { + let _ = tx.send(net); + } + }); + + TestnetHandle { _handle: handle, terminate: tx } + } +} + +impl Testnet { + /// Same as [`Self::try_create`] but panics on error + pub async fn create(num_peers: usize) -> Self { + Self::try_create(num_peers).await.unwrap() + } + + /// Creates a new [`Testnet`] with the given number of peers + pub async fn try_create(num_peers: usize) -> Result { + let mut this = Testnet::default(); + for _ in 0..num_peers { + this.add_peer().await?; + } + Ok(this) + } + + pub async fn add_peer(&mut self) -> Result<(), NetworkError> { + self.add_peer_with_config(Default::default()).await + } +} + +impl fmt::Debug for Testnet { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Testnet {{}}").finish_non_exhaustive() + } +} + +impl Future for Testnet +where + C: BlockProvider, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + for peer in this.peers.iter_mut() { + let _ = peer.poll_unpin(cx); + } + Poll::Pending + } +} + +pub struct TestnetHandle { + _handle: JoinHandle<()>, + terminate: oneshot::Sender>>, +} + +// === impl TestnetHandle === + +impl TestnetHandle { + /// Terminates the task and returns the [`Testnet`] back. + pub async fn terminate(self) -> Testnet { + let (tx, rx) = oneshot::channel(); + self.terminate.send(tx).unwrap(); + rx.await.unwrap() + } +} + +#[pin_project] +pub struct Peer { + #[pin] + network: NetworkManager, + client: Arc, + secret_key: SecretKey, +} + +// === impl Peer === + +impl Peer +where + C: BlockProvider, +{ + pub fn num_peers(&self) -> usize { + self.network.num_connected_peers() + } + + /// The address that listens for incoming connections. + pub fn local_addr(&self) -> SocketAddr { + self.network.local_addr() + } + + pub fn handle(&self) -> NetworkHandle { + self.network.handle().clone() + } +} + +impl Future for Peer +where + C: BlockProvider, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().network.poll(cx) + } +} + +pub struct PeerConfig { + config: NetworkConfig, + client: Arc, + secret_key: SecretKey, +} + +impl Default for PeerConfig { + fn default() -> Self { + let client = Arc::new(TestApi::default()); + let secret_key = SecretKey::new(&mut rand::thread_rng()); + let config = NetworkConfig::builder(Arc::clone(&client), secret_key) + .listener_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))) + .discovery_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))) + .build(); + Self { config, client, secret_key } + } +}