From 3779a225fbd43f666108835448090822b223466d Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 14 Apr 2023 17:42:49 +0200 Subject: [PATCH] fix: remove peers after incoming connection closed (#2245) --- crates/net/network/src/manager.rs | 2 +- crates/net/network/src/peers/manager.rs | 94 ++++++++++++++++++++----- 2 files changed, 76 insertions(+), 20 deletions(-) diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 360272056e..31d56a74c9 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -658,7 +658,7 @@ where this.swarm .state_mut() .peers_mut() - .on_active_inbound_session(peer_id, remote_addr); + .on_incoming_session_established(peer_id, remote_addr); } this.event_listeners.send(NetworkEvent::SessionEstablished { peer_id, diff --git a/crates/net/network/src/peers/manager.rs b/crates/net/network/src/peers/manager.rs index 43f24eb238..6dbc2fd7b5 100644 --- a/crates/net/network/src/peers/manager.rs +++ b/crates/net/network/src/peers/manager.rs @@ -177,6 +177,18 @@ impl PeersManager { self.peers.iter().map(|(peer_id, v)| NodeRecord::new(v.addr, *peer_id)) } + /// Returns the number of currently active inbound connections. + #[inline] + pub(crate) fn num_inbound_connections(&self) -> usize { + self.connection_info.num_inbound + } + + /// Returns the number of currently active outbound connections. + #[inline] + pub(crate) fn num_outbound_connections(&self) -> usize { + self.connection_info.num_outbound + } + /// Invoked when a new _incoming_ tcp connection is accepted. /// /// returns an error if the inbound ip address is on the ban list or @@ -207,18 +219,6 @@ impl PeersManager { self.connection_info.decr_in() } - /// Returns the number of currently active inbound connections. - #[inline] - pub(crate) fn num_inbound_connections(&self) -> usize { - self.connection_info.num_inbound - } - - /// Returns the number of currently active outbound connections. - #[inline] - pub(crate) fn num_outbound_connections(&self) -> usize { - self.connection_info.num_outbound - } - /// Invoked when a pending session was closed. pub(crate) fn on_incoming_pending_session_dropped( &mut self, @@ -243,7 +243,7 @@ impl PeersManager { /// /// If the reputation of the peer is below the `BANNED_REPUTATION` threshold, a disconnect will /// be scheduled. - pub(crate) fn on_active_inbound_session(&mut self, peer_id: PeerId, addr: SocketAddr) { + pub(crate) fn on_incoming_session_established(&mut self, peer_id: PeerId, addr: SocketAddr) { // we only need to check the peer id here as the ip address will have been checked at // on_inbound_pending_session if self.ban_list.is_banned_peer(&peer_id) { @@ -264,7 +264,11 @@ impl PeersManager { value.state = PeerConnectionState::In; } Entry::Vacant(entry) => { - entry.insert(Peer::with_state(addr, PeerConnectionState::In)); + // peer is missing in the table, we add it but mark it as to be removed after + // disconnect, because we only know the outgoing port + let mut peer = Peer::with_state(addr, PeerConnectionState::In); + peer.remove_after_disconnect = true; + entry.insert(peer); self.queued_actions.push_back(PeerAction::PeerAdded(peer_id)); } } @@ -529,10 +533,18 @@ impl PeersManager { match self.peers.entry(peer_id) { Entry::Occupied(mut entry) => { - let node = entry.get_mut(); - node.kind = kind; - node.addr = addr; - node.fork_id = fork_id; + let peer = entry.get_mut(); + peer.kind = kind; + peer.fork_id = fork_id; + peer.addr = addr; + + if peer.state.is_incoming() { + // now that we have an actual discovered address, for that peer and not just the + // ip of the incoming connection, we don't need to remove the peer after + // disconnecting, See `on_incoming_session_established` + peer.remove_after_disconnect = false; + } + return } Entry::Vacant(entry) => { @@ -914,6 +926,12 @@ impl PeerConnectionState { } } + /// Returns true if this is an active incoming connection. + #[inline] + fn is_incoming(&self) -> bool { + matches!(self, PeerConnectionState::In) + } + /// Returns whether we're currently connected with this peer #[inline] fn is_connected(&self) -> bool { @@ -1800,7 +1818,7 @@ mod test { let ban_list = BanList::new(vec![given_peer_id], HashSet::new()); let config = PeersConfig::default().with_ban_list(ban_list); let mut peer_manager = PeersManager::new(config); - peer_manager.on_active_inbound_session(given_peer_id, socket_addr); + peer_manager.on_incoming_session_established(given_peer_id, socket_addr); let Some(PeerAction::DisconnectBannedIncoming { peer_id }) = peer_manager.queued_actions.pop_front() else { panic!() }; @@ -1953,4 +1971,42 @@ mod test { // tick applied assert!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation >= DEFAULT_REPUTATION); } + + #[tokio::test] + async fn test_remove_incoming_after_disconnect() { + let peer_id = PeerId::random(); + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009); + let mut peers = PeersManager::default(); + + peers.on_incoming_pending_session(addr.ip()).unwrap(); + peers.on_incoming_session_established(peer_id, addr); + let peer = peers.peers.get(&peer_id).unwrap(); + assert_eq!(peer.state, PeerConnectionState::In); + assert!(peer.remove_after_disconnect); + + peers.on_active_session_gracefully_closed(peer_id); + assert!(peers.peers.get(&peer_id).is_none()) + } + + #[tokio::test] + async fn test_keep_incoming_after_disconnect_if_discovered() { + let peer_id = PeerId::random(); + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009); + let mut peers = PeersManager::default(); + + peers.on_incoming_pending_session(addr.ip()).unwrap(); + peers.on_incoming_session_established(peer_id, addr); + let peer = peers.peers.get(&peer_id).unwrap(); + assert_eq!(peer.state, PeerConnectionState::In); + assert!(peer.remove_after_disconnect); + + // trigger discovery manually while the peer is still connected + peers.add_peer(peer_id, addr, None); + + peers.on_active_session_gracefully_closed(peer_id); + + let peer = peers.peers.get(&peer_id).unwrap(); + assert_eq!(peer.state, PeerConnectionState::Idle); + assert!(!peer.remove_after_disconnect); + } }