diff --git a/crates/net/network/src/peers/manager.rs b/crates/net/network/src/peers/manager.rs index 4bf31fad91..983955a48e 100644 --- a/crates/net/network/src/peers/manager.rs +++ b/crates/net/network/src/peers/manager.rs @@ -221,19 +221,19 @@ impl PeersManager { if self.ban_list.is_banned_ip(&addr) { return Err(InboundConnectionError::IpBanned) } - self.connection_info.inc_in(); + self.connection_info.inc_pending_in(); Ok(()) } /// Invoked when a previous call to [Self::on_incoming_pending_session] succeeded but it was /// rejected. pub(crate) fn on_incoming_pending_session_rejected_internally(&mut self) { - self.connection_info.decr_in(); + self.connection_info.decr_pending_in(); } /// Invoked when a pending session was closed. pub(crate) fn on_incoming_pending_session_gracefully_closed(&mut self) { - self.connection_info.decr_in() + self.connection_info.decr_pending_in() } /// Invoked when a pending session was closed. @@ -251,7 +251,7 @@ impl PeersManager { } } - self.connection_info.decr_in() + self.connection_info.decr_pending_in(); } /// Called when a new _incoming_ active session was established to the given peer. @@ -271,6 +271,10 @@ impl PeersManager { // start a new tick, so the peer is not immediately rewarded for the time since last tick self.tick(); + let has_in_capacity = self.connection_info.has_in_capacity(); + self.connection_info.decr_pending_in(); + self.connection_info.inc_in(); + match self.peers.entry(peer_id) { Entry::Occupied(mut entry) => { let value = entry.get_mut(); @@ -281,7 +285,7 @@ impl PeersManager { value.state = PeerConnectionState::In; // if a peer is not trusted and we don't have capacity for more inbound connections, // disconnecting the peer - if !value.is_trusted() && !self.connection_info.has_in_capacity() { + if !value.is_trusted() && !has_in_capacity { self.queued_actions.push_back(PeerAction::Disconnect { peer_id, reason: Some(DisconnectReason::TooManyPeers), @@ -297,7 +301,7 @@ impl PeersManager { self.queued_actions.push_back(PeerAction::PeerAdded(peer_id)); // disconnect the peer if we don't have capacity for more inbound connections - if !self.connection_info.has_in_capacity() { + if !has_in_capacity { self.queued_actions.push_back(PeerAction::Disconnect { peer_id, reason: Some(DisconnectReason::TooManyPeers), @@ -586,7 +590,7 @@ impl PeersManager { match direction { Direction::Incoming => { // need to decrement the ingoing counter - self.connection_info.decr_in(); + self.connection_info.decr_pending_in(); } Direction::Outgoing(_) => { // need to decrement the outgoing counter @@ -763,7 +767,7 @@ impl PeersManager { PeerAction::Connect { peer_id, remote_addr: peer.addr } }; - self.connection_info.inc_pendingout(); + self.connection_info.inc_pending_out(); self.queued_actions.push_back(action); } @@ -865,10 +869,13 @@ pub struct ConnectionInfo { num_outbound: usize, /// Counter for pending outbound connections. #[cfg_attr(feature = "serde", serde(skip))] - num_pendingout: usize, + num_pending_out: usize, /// Counter for currently occupied slots for active inbound connections. #[cfg_attr(feature = "serde", serde(skip))] num_inbound: usize, + /// Counter for pending inbound connections. + #[cfg_attr(feature = "serde", serde(skip))] + num_pending_in: usize, /// Maximum allowed outbound connections. max_outbound: usize, /// Maximum allowed inbound connections. @@ -883,13 +890,13 @@ pub struct ConnectionInfo { impl ConnectionInfo { /// Returns `true` if there's still capacity for a new outgoing connection. fn has_out_capacity(&self) -> bool { - self.num_pendingout < self.max_concurrent_outbound_dials && + self.num_pending_out < self.max_concurrent_outbound_dials && self.num_outbound < self.max_outbound } /// Returns `true` if there's still capacity for a new incoming connection. fn has_in_capacity(&self) -> bool { - self.num_inbound <= self.max_inbound + self.num_inbound < self.max_inbound } fn decr_state(&mut self, state: PeerConnectionState) { @@ -897,7 +904,7 @@ impl ConnectionInfo { PeerConnectionState::Idle => {} PeerConnectionState::DisconnectingIn | PeerConnectionState::In => self.decr_in(), PeerConnectionState::DisconnectingOut | PeerConnectionState::Out => self.decr_out(), - PeerConnectionState::PendingOut => self.decr_pendingout(), + PeerConnectionState::PendingOut => self.decr_pending_out(), } } @@ -909,20 +916,28 @@ impl ConnectionInfo { self.num_outbound += 1; } - fn inc_pendingout(&mut self) { - self.num_pendingout += 1; + fn inc_pending_out(&mut self) { + self.num_pending_out += 1; } fn inc_in(&mut self) { self.num_inbound += 1; } + fn inc_pending_in(&mut self) { + self.num_pending_in += 1; + } + fn decr_in(&mut self) { self.num_inbound -= 1; } - fn decr_pendingout(&mut self) { - self.num_pendingout -= 1; + fn decr_pending_out(&mut self) { + self.num_pending_out -= 1; + } + + fn decr_pending_in(&mut self) { + self.num_pending_in -= 1; } } @@ -934,7 +949,8 @@ impl Default for ConnectionInfo { max_outbound: DEFAULT_MAX_COUNT_PEERS_OUTBOUND as usize, max_inbound: DEFAULT_MAX_COUNT_PEERS_INBOUND as usize, max_concurrent_outbound_dials: DEFAULT_MAX_COUNT_CONCURRENT_OUTBOUND_DIALS, - num_pendingout: 0, + num_pending_out: 0, + num_pending_in: 0, } } } @@ -1908,9 +1924,9 @@ mod tests { let mut peers = PeersManager::default(); assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok()); - assert_eq!(peers.connection_info.num_inbound, 1); + assert_eq!(peers.connection_info.num_pending_in, 1); peers.on_incoming_pending_session_rejected_internally(); - assert_eq!(peers.connection_info.num_inbound, 0); + assert_eq!(peers.connection_info.num_pending_in, 0); } #[tokio::test] @@ -1919,9 +1935,9 @@ mod tests { let mut peers = PeersManager::default(); assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok()); - assert_eq!(peers.connection_info.num_inbound, 1); + assert_eq!(peers.connection_info.num_pending_in, 1); peers.on_incoming_pending_session_gracefully_closed(); - assert_eq!(peers.connection_info.num_inbound, 0); + assert_eq!(peers.connection_info.num_pending_in, 0); } #[tokio::test] @@ -1932,7 +1948,7 @@ mod tests { let mut peers = PeersManager::new(config); assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok()); - assert_eq!(peers.connection_info.num_inbound, 1); + assert_eq!(peers.connection_info.num_pending_in, 1); let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError( P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected( DisconnectReason::UselessPeer, @@ -1940,7 +1956,7 @@ mod tests { )); peers.on_incoming_pending_session_dropped(socket_addr, &err); - assert_eq!(peers.connection_info.num_inbound, 0); + assert_eq!(peers.connection_info.num_pending_in, 0); assert!(peers.ban_list.is_banned_ip(&socket_addr.ip())); assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err()); @@ -2007,20 +2023,32 @@ mod tests { let peer = PeerId::random(); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); - assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok()); - peers.on_incoming_session_established(peer, socket_addr); - // peer should have been added and num_inbound should have been increased + // Attempt to establish an incoming session, expecting `num_pending_in` to increase by 1 + assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok()); + assert_eq!(peers.connection_info.num_pending_in, 1); + + // Establish a session with the peer, expecting the peer to be added and the `num_inbound` + // to increase by 1 + peers.on_incoming_session_established(peer, socket_addr); let p = peers.peers.get_mut(&peer).expect("peer not found"); assert_eq!(p.addr, socket_addr); + assert_eq!(peers.connection_info.num_pending_in, 0); assert_eq!(peers.connection_info.num_inbound, 1); + // Attempt to establish another incoming session, expecting the `num_pending_in` to increase + // by 1 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok()); + assert_eq!(peers.connection_info.num_pending_in, 1); + + // Simulate a rejection due to an already established connection, expecting the + // `num_pending_in` to decrease by 1. The peer should remain connected and the `num_inbound` + // should not be changed. peers.on_already_connected(Direction::Incoming); - // peer should not be connected and num_inbound should not have been increased let p = peers.peers.get_mut(&peer).expect("peer not found"); assert_eq!(p.addr, socket_addr); + assert_eq!(peers.connection_info.num_pending_in, 0); assert_eq!(peers.connection_info.num_inbound, 1); } @@ -2458,7 +2486,7 @@ mod tests { peer_manager.fill_outbound_slots(); // all dialed connections should be in 'PendingOut' state - let dials = peer_manager.connection_info.num_pendingout; + let dials = peer_manager.connection_info.num_pending_out; assert_eq!(dials, peer_manager.connection_info.max_concurrent_outbound_dials); let num_pendingout_states = peer_manager @@ -2483,6 +2511,6 @@ mod tests { } // no more pending outbound connections - assert_eq!(peer_manager.connection_info.num_pendingout, 0); + assert_eq!(peer_manager.connection_info.num_pending_out, 0); } }