From b2abcddeda5d9da1801fabe256af22a9b63822f8 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 20 Dec 2022 22:19:06 +0100 Subject: [PATCH] fix(net): temporarily backoff busy peers (#548) * fix(net): temporarily backoff busy peers * chore: rustfmt --- crates/net/network/src/error.rs | 3 + crates/net/network/src/peers/manager.rs | 111 +++++++++++++++++++-- crates/net/network/src/peers/reputation.rs | 4 + 3 files changed, 107 insertions(+), 11 deletions(-) diff --git a/crates/net/network/src/error.rs b/crates/net/network/src/error.rs index 7457155498..dbe1a7e44c 100644 --- a/crates/net/network/src/error.rs +++ b/crates/net/network/src/error.rs @@ -50,6 +50,9 @@ pub(crate) fn is_fatal_protocol_error(err: &EthStreamError) -> bool { P2PStreamError::EmptyProtocolMessage | P2PStreamError::ParseVersionError(_) | P2PStreamError::Disconnected(DisconnectReason::UselessPeer) | + P2PStreamError::Disconnected( + DisconnectReason::IncompatibleP2PProtocolVersion + ) | P2PStreamError::MismatchedProtocolVersion { .. } ) } diff --git a/crates/net/network/src/peers/manager.rs b/crates/net/network/src/peers/manager.rs index 3b3c2a1346..d5475388de 100644 --- a/crates/net/network/src/peers/manager.rs +++ b/crates/net/network/src/peers/manager.rs @@ -18,6 +18,7 @@ use std::{ time::Duration, }; +use crate::peers::reputation::BACKOFF_REPUTATION_CHANGE; use thiserror::Error; use tokio::{ sync::{mpsc, oneshot}, @@ -91,6 +92,9 @@ pub(crate) struct PeersManager { unban_interval: Interval, /// How long to ban bad peers. ban_duration: Duration, + /// How long peers to which we could not connect for non-fatal reasons, e.g. + /// [`DisconnectReason::TooManyPeers`], are put in time out. + backoff_duration: Duration, } impl PeersManager { @@ -102,9 +106,14 @@ impl PeersManager { reputation_weights, ban_list, ban_duration, + backoff_duration, } = config; let (manager_tx, handle_rx) = mpsc::unbounded_channel(); let now = Instant::now(); + + // We use half of the interval to decrease the max duration to `150%` in worst case + let unban_interval = ban_duration.min(backoff_duration) / 2; + Self { peers: Default::default(), manager_tx, @@ -115,11 +124,11 @@ impl PeersManager { now + refill_slots_interval, refill_slots_interval, ), - // Use half of ban duration for interval - unban_interval: tokio::time::interval_at(now + ban_duration, ban_duration / 2), + unban_interval: tokio::time::interval_at(now + unban_interval, unban_interval), connection_info, ban_list, ban_duration, + backoff_duration, } } @@ -196,6 +205,11 @@ impl PeersManager { self.queued_actions.push_back(PeerAction::BanPeer { peer_id }); } + /// Temporarily puts the peer in timeout + fn backoff_peer(&mut self, peer_id: PeerId) { + self.ban_list.ban_peer_until(peer_id, std::time::Instant::now() + self.backoff_duration); + } + /// Unbans the peer fn unban_peer(&mut self, peer_id: PeerId) { self.ban_list.unban_peer(&peer_id); @@ -288,11 +302,23 @@ impl PeersManager { ip_addr: remote_addr.ip(), }) } - } else if let Some(mut peer) = self.peers.get_mut(peer_id) { - self.connection_info.decr_state(peer.state); - peer.state = PeerConnectionState::Idle; - let reputation_change = self.reputation_weights.change(ReputationChangeKind::Dropped); - peer.reputation = peer.reputation.saturating_sub(reputation_change.as_i32()); + } else { + let reputation_change = if let Some(DisconnectReason::TooManyPeers) = + err.as_disconnected() + { + // The peer has signaled that it is currently unable to process any more + // connections, so we will hold off on attempting any new connections for a while + self.backoff_peer(*peer_id); + BACKOFF_REPUTATION_CHANGE.into() + } else { + self.reputation_weights.change(ReputationChangeKind::Dropped) + }; + + if let Some(mut peer) = self.peers.get_mut(peer_id) { + self.connection_info.decr_state(peer.state); + peer.state = PeerConnectionState::Idle; + peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32()); + } } self.fill_outbound_slots(); @@ -440,10 +466,6 @@ impl PeersManager { } } - if self.refill_slots_interval.poll_tick(cx).is_ready() { - self.fill_outbound_slots(); - } - if self.unban_interval.poll_tick(cx).is_ready() { for peer_id in self.ban_list.evict_peers(std::time::Instant::now()) { if let Some(peer) = self.peers.get_mut(&peer_id) { @@ -455,6 +477,10 @@ impl PeersManager { } } + if self.refill_slots_interval.poll_tick(cx).is_ready() { + self.fill_outbound_slots(); + } + if self.queued_actions.is_empty() { return Poll::Pending } @@ -701,6 +727,9 @@ pub struct PeersConfig { pub ban_list: BanList, /// How long to ban bad peers. pub ban_duration: Duration, + /// How long to backoff peers that are we failed to connect to for non-fatal reasons, such as + /// [`DisconnectReason::TooManyPeers`]. + pub backoff_duration: Duration, } impl Default for PeersConfig { @@ -712,6 +741,8 @@ impl Default for PeersConfig { ban_list: Default::default(), // Ban peers for 12h ban_duration: Duration::from_secs(60 * 60 * 12), + // backoff peers for 1h + backoff_duration: Duration::from_secs(60 * 60), } } } @@ -786,6 +817,7 @@ mod test { net::{IpAddr, Ipv4Addr, SocketAddr}, pin::Pin, task::{Context, Poll}, + time::Duration, }; struct PeerActionFuture<'a> { @@ -844,6 +876,63 @@ mod test { .await; } + #[tokio::test] + async fn test_backoff_on_busy() { + let peer = PeerId::random(); + let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); + + let backoff_duration = Duration::from_secs(3); + let config = PeersConfig { backoff_duration, ..Default::default() }; + let mut peers = PeersManager::new(config); + peers.add_discovered_node(peer, socket_addr); + + match event!(peers) { + PeerAction::Connect { peer_id, .. } => { + assert_eq!(peer_id, peer); + } + _ => unreachable!(), + } + + poll_fn(|cx| { + assert!(peers.poll(cx).is_pending()); + Poll::Ready(()) + }) + .await; + + peers.on_connection_dropped( + &socket_addr, + &peer, + &EthStreamError::P2PStreamError(P2PStreamError::Disconnected( + DisconnectReason::TooManyPeers, + )), + ); + + poll_fn(|cx| { + assert!(peers.poll(cx).is_pending()); + Poll::Ready(()) + }) + .await; + + assert!(peers.ban_list.is_banned_peer(&peer)); + assert!(peers.peers.get(&peer).is_some()); + + tokio::time::sleep(backoff_duration).await; + + match event!(peers) { + PeerAction::UnBanPeer { peer_id, .. } => { + assert_eq!(peer_id, peer); + } + _ => unreachable!(), + } + + match event!(peers) { + PeerAction::Connect { peer_id, .. } => { + assert_eq!(peer_id, peer); + } + _ => unreachable!(), + } + } + #[tokio::test] async fn test_ban_on_drop() { let peer = PeerId::random(); diff --git a/crates/net/network/src/peers/reputation.rs b/crates/net/network/src/peers/reputation.rs index 97493762e3..f7b4de6876 100644 --- a/crates/net/network/src/peers/reputation.rs +++ b/crates/net/network/src/peers/reputation.rs @@ -27,6 +27,10 @@ const BAD_MESSAGE_REPUTATION_CHANGE: i32 = 8 * REPUTATION_UNIT; /// The reputation change to apply to a peer which violates protocol rules: minimal reputation const BAD_PROTOCOL_REPUTATION_CHANGE: i32 = i32::MIN; +/// A reputation change to apply to backoff the peer. This has the same effect as marking the peer +/// as banned. +pub(crate) const BACKOFF_REPUTATION_CHANGE: i32 = i32::MIN; + /// Returns `true` if the given reputation is below the [`BANNED_REPUTATION`] threshold #[inline] pub(crate) fn is_banned_reputation(reputation: i32) -> bool {