From f5c9064b44e1059990e3a78771edb86ccfc2c6ad Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 20 Dec 2022 18:02:16 +0100 Subject: [PATCH] test(net): add e2e test for already connected (#538) --- crates/net/network/src/manager.rs | 6 ++- crates/net/network/src/transactions.rs | 2 +- crates/net/network/tests/it/connect.rs | 51 ++++++++++++++++++++++++-- crates/net/network/tests/it/testnet.rs | 19 +++++++++- 4 files changed, 71 insertions(+), 7 deletions(-) diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 8a7e1c8b5a..2d23962c73 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -572,6 +572,7 @@ where "Session disconnected" ); + let mut reason = None; if let Some(ref err) = error { // If the connection was closed due to an error, we report the peer this.swarm.state_mut().peers_mut().on_connection_dropped( @@ -579,12 +580,13 @@ where &peer_id, err, ); + reason = err.as_disconnected(); } else { // Gracefully disconnected this.swarm.state_mut().peers_mut().on_disconnected(peer_id); } - this.event_listeners.send(NetworkEvent::SessionClosed { peer_id }); + this.event_listeners.send(NetworkEvent::SessionClosed { peer_id, reason }); } SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => { warn!( @@ -651,6 +653,8 @@ pub enum NetworkEvent { SessionClosed { /// The identifier of the peer to which a session was closed. peer_id: PeerId, + /// Why the disconnect was triggered + reason: Option, }, /// Established a new session with the given peer. SessionEstablished { diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index 9087936030..62c6711f2d 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -288,7 +288,7 @@ where /// Handles a received event related to common network events. fn on_network_event(&mut self, event: NetworkEvent) { match event { - NetworkEvent::SessionClosed { peer_id } => { + NetworkEvent::SessionClosed { peer_id, .. } => { // remove the peer self.peers.remove(&peer_id); } diff --git a/crates/net/network/tests/it/connect.rs b/crates/net/network/tests/it/connect.rs index c8c0634b91..b568407105 100644 --- a/crates/net/network/tests/it/connect.rs +++ b/crates/net/network/tests/it/connect.rs @@ -1,6 +1,6 @@ //! Connection tests -use crate::NetworkEventStream; +use crate::{NetworkEventStream, PeerConfig}; use super::testnet::Testnet; use enr::{k256::ecdsa::SigningKey, Enr, EnrPublicKey}; @@ -8,6 +8,7 @@ use ethers_core::utils::Geth; use ethers_providers::{Http, Middleware, Provider}; use futures::StreamExt; use reth_discv4::{bootnodes::mainnet_nodes, Discv4Config, NodeRecord}; +use reth_eth_wire::DisconnectReason; use reth_net_common::ban_list::BanList; use reth_network::{NetworkConfig, NetworkEvent, NetworkManager, PeersConfig}; use reth_primitives::PeerId; @@ -79,6 +80,50 @@ async fn test_establish_connections() { } } +#[tokio::test(flavor = "multi_thread")] +async fn test_already_connected() { + reth_tracing::init_tracing(); + let mut net = Testnet::default(); + + let secret_key = SecretKey::new(&mut rand::thread_rng()); + let client = Arc::new(TestApi::default()); + let p1 = PeerConfig::default(); + + // initialize two peers with the same identifier + let p2 = PeerConfig::with_secret_key(Arc::clone(&client), secret_key); + let p3 = PeerConfig::with_secret_key(Arc::clone(&client), secret_key); + + net.extend_peer_with_config(vec![p1, p2, p3]).await.unwrap(); + + let mut handles = net.handles(); + let handle0 = handles.next().unwrap(); + let handle1 = handles.next().unwrap(); + let handle2 = handles.next().unwrap(); + + drop(handles); + let _handle = net.spawn(); + + let mut listener0 = NetworkEventStream::new(handle0.event_listener()); + let mut listener2 = NetworkEventStream::new(handle2.event_listener()); + + handle0.add_peer(*handle1.peer_id(), handle1.local_addr()); + + let peer = listener0.next_session_established().await.unwrap(); + assert_eq!(peer, *handle1.peer_id()); + + handle2.add_peer(*handle0.peer_id(), handle0.local_addr()); + let peer = listener2.next_session_established().await.unwrap(); + assert_eq!(peer, *handle0.peer_id()); + + let (peer, reason) = listener2.next_session_closed().await.unwrap(); + assert_eq!(peer, *handle0.peer_id()); + let reason = reason.unwrap(); + assert_eq!(reason, DisconnectReason::AlreadyConnected); + + assert_eq!(handle0.num_connected_peers(), 1); + assert_eq!(handle1.num_connected_peers(), 1); +} + #[tokio::test(flavor = "multi_thread")] #[ignore] async fn test_connect_with_boot_nodes() { @@ -181,7 +226,7 @@ async fn test_incoming_node_id_blacklist() { assert_eq!(incoming_peer_id, geth_peer_id); // check to see that the session was closed - let incoming_peer_id = event_stream.next_session_closed().await.unwrap(); + let incoming_peer_id = event_stream.next_session_closed().await.unwrap().0; assert_eq!(incoming_peer_id, geth_peer_id); }) .await @@ -327,7 +372,7 @@ async fn test_geth_disconnect() { handle.disconnect_peer(geth_peer_id); // wait for a disconnect from geth - if let Some(NetworkEvent::SessionClosed { peer_id }) = events.next().await { + if let Some(NetworkEvent::SessionClosed { peer_id, .. }) = events.next().await { assert_eq!(peer_id, geth_peer_id); } else { panic!("Expected a session closed event"); diff --git a/crates/net/network/tests/it/testnet.rs b/crates/net/network/tests/it/testnet.rs index fed1698e98..37e6934a5d 100644 --- a/crates/net/network/tests/it/testnet.rs +++ b/crates/net/network/tests/it/testnet.rs @@ -3,6 +3,7 @@ use futures::{FutureExt, StreamExt}; use parking_lot::Mutex; use pin_project::pin_project; +use reth_eth_wire::DisconnectReason; use reth_network::{ error::NetworkError, eth_requests::EthRequestHandler, NetworkConfig, NetworkEvent, NetworkHandle, NetworkManager, @@ -72,6 +73,16 @@ where self.peers.iter() } + pub async fn extend_peer_with_config( + &mut self, + configs: impl IntoIterator>, + ) -> Result<(), NetworkError> { + for config in configs { + self.add_peer_with_config(config).await?; + } + Ok(()) + } + pub async fn add_peer_with_config( &mut self, config: PeerConfig, @@ -261,6 +272,10 @@ where { pub fn new(client: Arc) -> Self { let secret_key = SecretKey::new(&mut rand::thread_rng()); + Self::with_secret_key(client, secret_key) + } + + pub fn with_secret_key(client: Arc, secret_key: SecretKey) -> Self { let config = NetworkConfig::builder(Arc::clone(&client), secret_key) .listener_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))) .discovery_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))) @@ -289,10 +304,10 @@ impl NetworkEventStream { Self { inner } } - pub async fn next_session_closed(&mut self) -> Option { + pub async fn next_session_closed(&mut self) -> Option<(PeerId, Option)> { while let Some(ev) = self.inner.next().await { match ev { - NetworkEvent::SessionClosed { peer_id } => return Some(peer_id), + NetworkEvent::SessionClosed { peer_id, reason } => return Some((peer_id, reason)), NetworkEvent::SessionEstablished { .. } => continue, } }