mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-30 01:28:21 -05:00
fix: decrease connection info based on current state (#7165)
This commit is contained in:
@@ -796,7 +796,7 @@ where
|
||||
);
|
||||
|
||||
if let Some(ref err) = error {
|
||||
self.swarm.state_mut().peers_mut().on_pending_session_dropped(
|
||||
self.swarm.state_mut().peers_mut().on_outgoing_pending_session_dropped(
|
||||
&remote_addr,
|
||||
&peer_id,
|
||||
err,
|
||||
@@ -809,7 +809,7 @@ where
|
||||
self.swarm
|
||||
.state_mut()
|
||||
.peers_mut()
|
||||
.on_pending_session_gracefully_closed(&peer_id);
|
||||
.on_outgoing_pending_session_gracefully_closed(&peer_id);
|
||||
}
|
||||
self.metrics.closed_sessions.increment(1);
|
||||
self.metrics
|
||||
|
||||
@@ -427,19 +427,16 @@ impl PeersManager {
|
||||
}
|
||||
|
||||
/// Gracefully disconnected a pending _outgoing_ session
|
||||
pub(crate) fn on_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) {
|
||||
pub(crate) fn on_outgoing_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) {
|
||||
if let Some(peer) = self.peers.get_mut(peer_id) {
|
||||
self.connection_info.decr_state(peer.state);
|
||||
peer.state = PeerConnectionState::Idle;
|
||||
} else {
|
||||
return
|
||||
}
|
||||
|
||||
self.connection_info.decr_out();
|
||||
}
|
||||
|
||||
/// Invoked when an _outgoing_ pending session was closed during authentication or the
|
||||
/// handshake.
|
||||
pub(crate) fn on_pending_session_dropped(
|
||||
pub(crate) fn on_outgoing_pending_session_dropped(
|
||||
&mut self,
|
||||
remote_addr: &SocketAddr,
|
||||
peer_id: &PeerId,
|
||||
@@ -1675,7 +1672,7 @@ mod tests {
|
||||
})
|
||||
.await;
|
||||
|
||||
peers.on_pending_session_dropped(
|
||||
peers.on_outgoing_pending_session_dropped(
|
||||
&socket_addr,
|
||||
&peer,
|
||||
&PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
|
||||
@@ -1838,7 +1835,7 @@ mod tests {
|
||||
})
|
||||
.await;
|
||||
|
||||
peers.on_pending_session_dropped(
|
||||
peers.on_outgoing_pending_session_dropped(
|
||||
&socket_addr,
|
||||
&peer,
|
||||
&PendingSessionHandshakeError::Eth(
|
||||
@@ -1888,7 +1885,7 @@ mod tests {
|
||||
})
|
||||
.await;
|
||||
|
||||
peers.on_pending_session_dropped(
|
||||
peers.on_outgoing_pending_session_dropped(
|
||||
&socket_addr,
|
||||
&peer,
|
||||
&PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
|
||||
@@ -2204,6 +2201,38 @@ mod tests {
|
||||
assert_eq!(peers.num_outbound_connections(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_outgoing_connection_gracefully_closed() {
|
||||
let peer = PeerId::random();
|
||||
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
|
||||
let mut peers = PeersManager::default();
|
||||
peers.add_peer(peer, socket_addr, None);
|
||||
|
||||
match event!(peers) {
|
||||
PeerAction::PeerAdded(peer_id) => {
|
||||
assert_eq!(peer_id, peer);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
match event!(peers) {
|
||||
PeerAction::Connect { peer_id, remote_addr } => {
|
||||
assert_eq!(peer_id, peer);
|
||||
assert_eq!(remote_addr, socket_addr);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
|
||||
let p = peers.peers.get(&peer).unwrap();
|
||||
assert_eq!(p.state, PeerConnectionState::PendingOut);
|
||||
|
||||
assert_eq!(peers.num_outbound_connections(), 0);
|
||||
|
||||
peers.on_outgoing_pending_session_gracefully_closed(&peer);
|
||||
|
||||
assert_eq!(peers.num_outbound_connections(), 0);
|
||||
assert_eq!(peers.connection_info.num_pending_out, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_discovery_ban_list() {
|
||||
let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
|
||||
|
||||
Reference in New Issue
Block a user