diff --git a/crates/net/common/src/ban_list.rs b/crates/net/common/src/ban_list.rs index 6d162b4b09..44505fc301 100644 --- a/crates/net/common/src/ban_list.rs +++ b/crates/net/common/src/ban_list.rs @@ -47,21 +47,28 @@ impl BanList { evicted } - /// Removes all entries that should no longer be banned. - pub fn evict(&mut self, now: Instant) { - self.banned_ips.retain(|_, until| { + /// Removes all ip addresses that are no longer banned. + pub fn evict_ips(&mut self, now: Instant) -> Vec { + let mut evicted = Vec::new(); + self.banned_ips.retain(|peer, until| { if let Some(until) = until { - return *until > now + if now > *until { + evicted.push(*peer); + return false + } } true }); + evicted + } - self.banned_peers.retain(|_, until| { - if let Some(until) = until { - return *until > now - } - true - }); + /// Removes all entries that should no longer be banned. + /// + /// Returns the evicted entries. + pub fn evict(&mut self, now: Instant) -> (Vec, Vec) { + let ips = self.evict_ips(now); + let peers = self.evict_peers(now); + (ips, peers) } /// Returns true if either the given peer id _or_ ip address is banned. diff --git a/crates/net/network/src/error.rs b/crates/net/network/src/error.rs index 7eef42a3f8..403aaef59f 100644 --- a/crates/net/network/src/error.rs +++ b/crates/net/network/src/error.rs @@ -63,6 +63,15 @@ impl SessionError for EthStreamError { P2PStreamError::HandshakeError( P2PHandshakeError::NonHelloMessageInHandshake ) | + P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected( + DisconnectReason::UselessPeer + )) | + P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected( + DisconnectReason::IncompatibleP2PProtocolVersion + )) | + P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected( + DisconnectReason::ProtocolBreach + )) | P2PStreamError::UnknownReservedMessageId(_) | P2PStreamError::EmptyProtocolMessage | P2PStreamError::ParseVersionError(_) | @@ -70,6 +79,7 @@ impl SessionError for EthStreamError { P2PStreamError::Disconnected( DisconnectReason::IncompatibleP2PProtocolVersion ) | + P2PStreamError::Disconnected(DisconnectReason::ProtocolBreach) | P2PStreamError::MismatchedProtocolVersion { .. } ) } @@ -117,6 +127,17 @@ impl SessionError for PendingSessionHandshakeError { mod tests { use super::*; + #[test] + fn test_is_fatal_disconnect() { + let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError( + P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected( + DisconnectReason::UselessPeer, + )), + )); + + assert!(err.is_fatal_protocol_error()); + } + #[test] fn test_should_backoff() { let err = EthStreamError::P2PStreamError(P2PStreamError::HandshakeError( diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 680cc85055..a2cd46e8c8 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -18,7 +18,7 @@ use crate::{ config::NetworkConfig, discovery::Discovery, - error::{NetworkError, SessionError}, + error::NetworkError, eth_requests::IncomingEthRequest, import::{BlockImport, BlockImportOutcome, BlockValidation}, listener::ConnectionListener, @@ -597,10 +597,17 @@ where ?error, "Incoming pending session failed" ); - this.swarm.state_mut().peers_mut().on_closed_incoming_pending_session(); - if error.map(|err| err.merits_discovery_ban()).unwrap_or_default() { - this.swarm.state_mut().ban_ip_discovery(remote_addr.ip()); + if let Some(ref err) = error { + this.swarm + .state_mut() + .peers_mut() + .on_incoming_pending_session_dropped(remote_addr, err); + } else { + this.swarm + .state_mut() + .peers_mut() + .on_incoming_pending_session_gracefully_closed(); } } SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => { diff --git a/crates/net/network/src/peers/manager.rs b/crates/net/network/src/peers/manager.rs index d9d3577bf2..8767f64de1 100644 --- a/crates/net/network/src/peers/manager.rs +++ b/crates/net/network/src/peers/manager.rs @@ -139,7 +139,7 @@ impl PeersManager { /// /// returns an error if the inbound ip address is on the ban list or /// we have reached our limit for max inbound connections - pub(crate) fn on_inbound_pending_session( + pub(crate) fn on_incoming_pending_session( &mut self, addr: IpAddr, ) -> Result<(), InboundConnectionError> { @@ -155,8 +155,32 @@ impl PeersManager { Ok(()) } + /// Invoked when a previous call to [Self::on_incoming_pending_session] succeeded but it was + /// rejected. + pub(crate) fn on_incoming_pending_session_rejected_internally(&mut self) { + self.connection_info.decr_in(); + } + /// Invoked when a pending session was closed. - pub(crate) fn on_closed_incoming_pending_session(&mut self) { + pub(crate) fn on_incoming_pending_session_gracefully_closed(&mut self) { + self.connection_info.decr_in() + } + + /// Invoked when a pending session was closed. + pub(crate) fn on_incoming_pending_session_dropped( + &mut self, + remote_addr: SocketAddr, + err: &PendingSessionHandshakeError, + ) { + if err.is_fatal_protocol_error() { + self.ban_ip(remote_addr.ip()); + + if err.merits_discovery_ban() { + self.queued_actions + .push_back(PeerAction::DiscoveryBanIp { ip_addr: remote_addr.ip() }) + } + } + self.connection_info.decr_in() } @@ -189,12 +213,17 @@ impl PeersManager { } } - /// Bans the peer temporarily with the given timeout + /// Bans the peer temporarily with the configured ban timeout fn ban_peer(&mut self, peer_id: PeerId) { self.ban_list.ban_peer_until(peer_id, std::time::Instant::now() + self.ban_duration); self.queued_actions.push_back(PeerAction::BanPeer { peer_id }); } + /// Bans the IP temporarily with the configured ban timeout + fn ban_ip(&mut self, ip: IpAddr) { + self.ban_list.ban_ip_until(ip, std::time::Instant::now() + self.ban_duration); + } + /// Temporarily puts the peer in timeout fn backoff_peer(&mut self, peer_id: PeerId) { trace!(target: "net::peers", ?peer_id, "backing off"); @@ -307,7 +336,7 @@ impl PeersManager { // If the error is caused by a peer that should be banned from discovery if err.merits_discovery_ban() { - self.queued_actions.push_back(PeerAction::DiscoveryBan { + self.queued_actions.push_back(PeerAction::DiscoveryBanPeerId { peer_id: *peer_id, ip_addr: remote_addr.ip(), }) @@ -489,7 +518,9 @@ impl PeersManager { } if self.unban_interval.poll_tick(cx).is_ready() { - for peer_id in self.ban_list.evict_peers(std::time::Instant::now()) { + let (_, unbanned_peers) = self.ban_list.evict(std::time::Instant::now()); + + for peer_id in unbanned_peers { if let Some(peer) = self.peers.get_mut(&peer_id) { peer.unban(); } else { @@ -729,7 +760,9 @@ pub enum PeerAction { peer_id: PeerId, }, /// Ban the peer in discovery. - DiscoveryBan { peer_id: PeerId, ip_addr: IpAddr }, + DiscoveryBanPeerId { peer_id: PeerId, ip_addr: IpAddr }, + /// Ban the IP in discovery. + DiscoveryBanIp { ip_addr: IpAddr }, /// Ban the peer temporarily BanPeer { peer_id: PeerId }, /// Unban the peer temporarily @@ -829,7 +862,7 @@ mod test { PeersConfig, }; use reth_eth_wire::{ - error::{EthStreamError, P2PStreamError}, + error::{EthStreamError, P2PHandshakeError, P2PStreamError}, DisconnectReason, }; use reth_net_common::ban_list::BanList; @@ -1045,40 +1078,59 @@ mod test { } #[tokio::test] - async fn test_reputation_change() { - let peer = PeerId::random(); + async fn test_internally_closed_incoming() { let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); - peers.add_discovered_node(peer, socket_addr); - match event!(peers) { - PeerAction::Connect { peer_id, remote_addr } => { - assert_eq!(peer_id, peer); - assert_eq!(remote_addr, socket_addr); - } - _ => unreachable!(), - } + assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok()); + assert_eq!(peers.connection_info.num_inbound, 1); + peers.on_incoming_pending_session_rejected_internally(); + assert_eq!(peers.connection_info.num_inbound, 0); + } - peers.apply_reputation_change(&peer, ReputationChangeKind::BadProtocol); + #[tokio::test] + async fn test_closed_incoming() { + let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); + let mut peers = PeersManager::default(); - let p = peers.peers.get(&peer).unwrap(); - assert!(p.is_banned()); + assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok()); + assert_eq!(peers.connection_info.num_inbound, 1); + peers.on_incoming_pending_session_gracefully_closed(); + assert_eq!(peers.connection_info.num_inbound, 0); + } - match event!(peers) { - PeerAction::Disconnect { peer_id, .. } => { - assert_eq!(peer_id, peer); - } - _ => { - unreachable!() - } - } + #[tokio::test] + async fn test_dropped_incoming() { + let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); + let ban_duration = Duration::from_millis(500); + let config = PeersConfig { ban_duration, ..Default::default() }; + let mut peers = PeersManager::new(config); - match event!(peers) { - PeerAction::BanPeer { peer_id } => { - assert_eq!(peer_id, peer); - } - _ => unreachable!(), - } + assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok()); + assert_eq!(peers.connection_info.num_inbound, 1); + let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError( + P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected( + DisconnectReason::UselessPeer, + )), + )); + + peers.on_incoming_pending_session_dropped(socket_addr, &err); + assert_eq!(peers.connection_info.num_inbound, 0); + assert!(peers.ban_list.is_banned_ip(&socket_addr.ip())); + + assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err()); + + // unbanned after timeout + tokio::time::sleep(ban_duration).await; + + poll_fn(|cx| { + let _ = peers.poll(cx); + Poll::Ready(()) + }) + .await; + + assert!(!peers.ban_list.is_banned_ip(&socket_addr.ip())); + assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok()); } #[tokio::test] @@ -1176,7 +1228,7 @@ mod test { let ban_list = BanList::new(HashSet::new(), vec![ip]); let config = PeersConfig::default().with_ban_list(ban_list); let mut peer_manager = PeersManager::new(config); - let a = peer_manager.on_inbound_pending_session(socket_addr.ip()); + let a = peer_manager.on_incoming_pending_session(socket_addr.ip()); // because we have no active peers this should be fine for testings match a { Ok(_) => panic!(), diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index 474a39f1ff..5e31338ed3 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -280,11 +280,13 @@ where self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason }); } PeerAction::DisconnectBannedIncoming { peer_id } => { - // TODO: can IP ban self.state_fetcher.on_pending_disconnect(&peer_id); self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason: None }); } - PeerAction::DiscoveryBan { peer_id, ip_addr } => self.ban_discovery(peer_id, ip_addr), + PeerAction::DiscoveryBanPeerId { peer_id, ip_addr } => { + self.ban_discovery(peer_id, ip_addr) + } + PeerAction::DiscoveryBanIp { ip_addr } => self.ban_ip_discovery(ip_addr), PeerAction::BanPeer { .. } => {} PeerAction::UnBanPeer { .. } => {} } diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs index c5b0620e09..265bc1c00b 100644 --- a/crates/net/network/src/swarm.rs +++ b/crates/net/network/src/swarm.rs @@ -179,8 +179,9 @@ where return Some(SwarmEvent::TcpListenerClosed { remote_addr: address }) } ListenerEvent::Incoming { stream, remote_addr } => { + // ensure we can handle an incoming connection from this address if let Err(err) = - self.state_mut().peers_mut().on_inbound_pending_session(remote_addr.ip()) + self.state_mut().peers_mut().on_incoming_pending_session(remote_addr.ip()) { match err { InboundConnectionError::IpBanned => { @@ -202,6 +203,9 @@ where } Err(err) => { warn!(target: "net", ?err, "Incoming connection rejected"); + self.state_mut() + .peers_mut() + .on_incoming_pending_session_rejected_internally(); } } }