test(net): add e2e test for already connected (#538)

This commit is contained in:
Matthias Seitz
2022-12-20 18:02:16 +01:00
committed by GitHub
parent 7a13cf6688
commit f5c9064b44
4 changed files with 71 additions and 7 deletions

View File

@@ -572,6 +572,7 @@ where
"Session disconnected"
);
let mut reason = None;
if let Some(ref err) = error {
// If the connection was closed due to an error, we report the peer
this.swarm.state_mut().peers_mut().on_connection_dropped(
@@ -579,12 +580,13 @@ where
&peer_id,
err,
);
reason = err.as_disconnected();
} else {
// Gracefully disconnected
this.swarm.state_mut().peers_mut().on_disconnected(peer_id);
}
this.event_listeners.send(NetworkEvent::SessionClosed { peer_id });
this.event_listeners.send(NetworkEvent::SessionClosed { peer_id, reason });
}
SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => {
warn!(
@@ -651,6 +653,8 @@ pub enum NetworkEvent {
SessionClosed {
/// The identifier of the peer to which a session was closed.
peer_id: PeerId,
/// Why the disconnect was triggered
reason: Option<DisconnectReason>,
},
/// Established a new session with the given peer.
SessionEstablished {

View File

@@ -288,7 +288,7 @@ where
/// Handles a received event related to common network events.
fn on_network_event(&mut self, event: NetworkEvent) {
match event {
NetworkEvent::SessionClosed { peer_id } => {
NetworkEvent::SessionClosed { peer_id, .. } => {
// remove the peer
self.peers.remove(&peer_id);
}

View File

@@ -1,6 +1,6 @@
//! Connection tests
use crate::NetworkEventStream;
use crate::{NetworkEventStream, PeerConfig};
use super::testnet::Testnet;
use enr::{k256::ecdsa::SigningKey, Enr, EnrPublicKey};
@@ -8,6 +8,7 @@ use ethers_core::utils::Geth;
use ethers_providers::{Http, Middleware, Provider};
use futures::StreamExt;
use reth_discv4::{bootnodes::mainnet_nodes, Discv4Config, NodeRecord};
use reth_eth_wire::DisconnectReason;
use reth_net_common::ban_list::BanList;
use reth_network::{NetworkConfig, NetworkEvent, NetworkManager, PeersConfig};
use reth_primitives::PeerId;
@@ -79,6 +80,50 @@ async fn test_establish_connections() {
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_already_connected() {
reth_tracing::init_tracing();
let mut net = Testnet::default();
let secret_key = SecretKey::new(&mut rand::thread_rng());
let client = Arc::new(TestApi::default());
let p1 = PeerConfig::default();
// initialize two peers with the same identifier
let p2 = PeerConfig::with_secret_key(Arc::clone(&client), secret_key);
let p3 = PeerConfig::with_secret_key(Arc::clone(&client), secret_key);
net.extend_peer_with_config(vec![p1, p2, p3]).await.unwrap();
let mut handles = net.handles();
let handle0 = handles.next().unwrap();
let handle1 = handles.next().unwrap();
let handle2 = handles.next().unwrap();
drop(handles);
let _handle = net.spawn();
let mut listener0 = NetworkEventStream::new(handle0.event_listener());
let mut listener2 = NetworkEventStream::new(handle2.event_listener());
handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
let peer = listener0.next_session_established().await.unwrap();
assert_eq!(peer, *handle1.peer_id());
handle2.add_peer(*handle0.peer_id(), handle0.local_addr());
let peer = listener2.next_session_established().await.unwrap();
assert_eq!(peer, *handle0.peer_id());
let (peer, reason) = listener2.next_session_closed().await.unwrap();
assert_eq!(peer, *handle0.peer_id());
let reason = reason.unwrap();
assert_eq!(reason, DisconnectReason::AlreadyConnected);
assert_eq!(handle0.num_connected_peers(), 1);
assert_eq!(handle1.num_connected_peers(), 1);
}
#[tokio::test(flavor = "multi_thread")]
#[ignore]
async fn test_connect_with_boot_nodes() {
@@ -181,7 +226,7 @@ async fn test_incoming_node_id_blacklist() {
assert_eq!(incoming_peer_id, geth_peer_id);
// check to see that the session was closed
let incoming_peer_id = event_stream.next_session_closed().await.unwrap();
let incoming_peer_id = event_stream.next_session_closed().await.unwrap().0;
assert_eq!(incoming_peer_id, geth_peer_id);
})
.await
@@ -327,7 +372,7 @@ async fn test_geth_disconnect() {
handle.disconnect_peer(geth_peer_id);
// wait for a disconnect from geth
if let Some(NetworkEvent::SessionClosed { peer_id }) = events.next().await {
if let Some(NetworkEvent::SessionClosed { peer_id, .. }) = events.next().await {
assert_eq!(peer_id, geth_peer_id);
} else {
panic!("Expected a session closed event");

View File

@@ -3,6 +3,7 @@
use futures::{FutureExt, StreamExt};
use parking_lot::Mutex;
use pin_project::pin_project;
use reth_eth_wire::DisconnectReason;
use reth_network::{
error::NetworkError, eth_requests::EthRequestHandler, NetworkConfig, NetworkEvent,
NetworkHandle, NetworkManager,
@@ -72,6 +73,16 @@ where
self.peers.iter()
}
pub async fn extend_peer_with_config(
&mut self,
configs: impl IntoIterator<Item = PeerConfig<C>>,
) -> Result<(), NetworkError> {
for config in configs {
self.add_peer_with_config(config).await?;
}
Ok(())
}
pub async fn add_peer_with_config(
&mut self,
config: PeerConfig<C>,
@@ -261,6 +272,10 @@ where
{
pub fn new(client: Arc<C>) -> Self {
let secret_key = SecretKey::new(&mut rand::thread_rng());
Self::with_secret_key(client, secret_key)
}
pub fn with_secret_key(client: Arc<C>, secret_key: SecretKey) -> Self {
let config = NetworkConfig::builder(Arc::clone(&client), secret_key)
.listener_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
.discovery_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
@@ -289,10 +304,10 @@ impl NetworkEventStream {
Self { inner }
}
pub async fn next_session_closed(&mut self) -> Option<PeerId> {
pub async fn next_session_closed(&mut self) -> Option<(PeerId, Option<DisconnectReason>)> {
while let Some(ev) = self.inner.next().await {
match ev {
NetworkEvent::SessionClosed { peer_id } => return Some(peer_id),
NetworkEvent::SessionClosed { peer_id, reason } => return Some((peer_id, reason)),
NetworkEvent::SessionEstablished { .. } => continue,
}
}