From c1f124d3e3e2390062db5cfd74c885784602be8f Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 16 Dec 2022 18:22:25 +0100 Subject: [PATCH] feat(net): temporarily ban bad peers (#492) * feat(net): temporarily ban bad peers * use half duration interval * Update crates/net/network/src/peers/manager.rs Co-authored-by: Bjerg * fix bad test Co-authored-by: Bjerg --- crates/net/common/src/ban_list.rs | 15 ++ crates/net/network/src/fetch/mod.rs | 11 +- crates/net/network/src/manager.rs | 6 +- crates/net/network/src/peers/manager.rs | 297 ++++++++++++++++++--- crates/net/network/src/peers/reputation.rs | 9 + crates/net/network/src/state.rs | 2 + 6 files changed, 301 insertions(+), 39 deletions(-) diff --git a/crates/net/common/src/ban_list.rs b/crates/net/common/src/ban_list.rs index 1ef91374bc..6d162b4b09 100644 --- a/crates/net/common/src/ban_list.rs +++ b/crates/net/common/src/ban_list.rs @@ -32,6 +32,21 @@ impl BanList { Self { banned_ips, banned_peers } } + /// Removes all peers that are no longer banned. + pub fn evict_peers(&mut self, now: Instant) -> Vec { + let mut evicted = Vec::new(); + self.banned_peers.retain(|peer, until| { + if let Some(until) = until { + if now > *until { + evicted.push(*peer); + return false + } + } + true + }); + evicted + } + /// Removes all entries that should no longer be banned. pub fn evict(&mut self, now: Instant) { self.banned_ips.retain(|_, until| { diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index 7b28a7cea1..871b4f223c 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -23,6 +23,8 @@ pub use client::FetchClient; /// /// This type is hooked into the staged sync pipeline and delegates download request to available /// peers and sends the response once ready. +/// +/// This type maintains a list of connected peers that are available for requests. pub struct StateFetcher { /// Currently active [`GetBlockHeaders`] requests inflight_headers_requests: @@ -30,7 +32,7 @@ pub struct StateFetcher { /// Currently active [`GetBlockBodies`] requests inflight_bodies_requests: HashMap, PeerRequestResult>>>, - /// The list of available peers for requests. + /// The list of _available_ peers for requests. peers: HashMap, /// The handle to the peers manager peers_handle: PeersHandle, @@ -63,6 +65,9 @@ impl StateFetcher { self.peers.insert(peer_id, Peer { state: PeerState::Idle, best_hash, best_number }); } + /// Removes the peer from the peer list, after which it is no longer available for future + /// requests. + /// /// Invoked when an active session was closed. /// /// This cancels als inflight request and sends an error to the receiver. @@ -97,7 +102,7 @@ impl StateFetcher { } } - /// Returns the next idle peer that's ready to accept a request + /// Returns the _next_ idle peer that's ready to accept a request. fn next_peer(&mut self) -> Option<(&PeerId, &mut Peer)> { self.peers.iter_mut().find(|(_, peer)| peer.state.is_idle()) } @@ -181,7 +186,7 @@ impl StateFetcher { /// Returns a new followup request for the peer. /// - /// Caution: this expects that the peer is _not_ closed + /// Caution: this expects that the peer is _not_ closed. fn followup_request(&mut self, peer_id: PeerId) -> Option { let req = self.queued_requests.pop_front()?; let req = self.prepare_block_request(peer_id, req); diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 124682cb39..7dd228316d 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -603,9 +603,9 @@ where ?error, "Outgoing pending session failed" ); - let swarm = this.swarm.state_mut().peers_mut(); - swarm.on_closed_outgoing_pending_session(); - swarm.apply_reputation_change(&peer_id, ReputationChangeKind::FailedToConnect); + let peers = this.swarm.state_mut().peers_mut(); + peers.on_closed_outgoing_pending_session(&peer_id); + peers.apply_reputation_change(&peer_id, ReputationChangeKind::FailedToConnect); if error.map(|err| err.merits_discovery_ban()).unwrap_or_default() { this.swarm.state_mut().ban_discovery(peer_id, remote_addr.ip()); diff --git a/crates/net/network/src/peers/manager.rs b/crates/net/network/src/peers/manager.rs index 960ca15ae3..d22894cbea 100644 --- a/crates/net/network/src/peers/manager.rs +++ b/crates/net/network/src/peers/manager.rs @@ -1,6 +1,9 @@ use crate::{ error::{error_merits_discovery_ban, is_fatal_protocol_error}, - peers::{reputation::BANNED_REPUTATION, ReputationChangeKind, ReputationChangeWeights}, + peers::{ + reputation::{is_banned_reputation, DEFAULT_REPUTATION}, + ReputationChangeKind, ReputationChangeWeights, + }, }; use futures::StreamExt; use reth_eth_wire::{error::EthStreamError, DisconnectReason}; @@ -19,7 +22,7 @@ use tokio::{ time::{Instant, Interval}, }; use tokio_stream::wrappers::UnboundedReceiverStream; -use tracing::{debug, trace}; +use tracing::trace; /// A communication channel to the [`PeersManager`] to apply manual changes to the peer set. #[derive(Clone, Debug)] @@ -82,14 +85,24 @@ pub(crate) struct PeersManager { connection_info: ConnectionInfo, /// Tracks unwanted ips/peer ids, ban_list: BanList, + /// Interval at which to check for peers to unban. + unban_interval: Interval, + /// How long to ban bad peers. + ban_duration: Duration, } impl PeersManager { /// Create a new instance with the given config pub(crate) fn new(config: PeersConfig) -> Self { - let PeersConfig { refill_slots_interval, connection_info, reputation_weights, ban_list } = - config; + let PeersConfig { + refill_slots_interval, + connection_info, + reputation_weights, + ban_list, + ban_duration, + } = config; let (manager_tx, handle_rx) = mpsc::unbounded_channel(); + let now = Instant::now(); Self { peers: Default::default(), manager_tx, @@ -97,11 +110,14 @@ impl PeersManager { queued_actions: Default::default(), reputation_weights, refill_slots_interval: tokio::time::interval_at( - Instant::now() + refill_slots_interval, + now + refill_slots_interval, refill_slots_interval, ), + // Use half of ban duration for interval + unban_interval: tokio::time::interval_at(now + ban_duration, ban_duration / 2), connection_info, ban_list, + ban_duration, } } @@ -135,8 +151,11 @@ impl PeersManager { self.connection_info.decr_in() } - /// Invoked when a pending session was closed. - pub(crate) fn on_closed_outgoing_pending_session(&mut self) { + /// Invoked when a pending outgoing session was closed. + pub(crate) fn on_closed_outgoing_pending_session(&mut self, peer_id: &PeerId) { + if let Some(mut peer) = self.peers.get_mut(peer_id) { + peer.state = PeerConnectionState::Idle; + } self.connection_info.decr_out() } @@ -169,29 +188,40 @@ impl PeersManager { } } + /// Bans the peer temporarily with the given timeout + fn ban_peer(&mut self, peer_id: PeerId) { + self.ban_list.ban_peer_until(peer_id, std::time::Instant::now() + self.ban_duration); + self.queued_actions.push_back(PeerAction::BanPeer { peer_id }); + } + + /// Unbans the peer + fn unban_peer(&mut self, peer_id: PeerId) { + self.ban_list.unban_peer(&peer_id); + self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id }); + } + /// Apply the corresponding reputation change to the given peer pub(crate) fn apply_reputation_change(&mut self, peer_id: &PeerId, rep: ReputationChangeKind) { let reputation_change = self.reputation_weights.change(rep); - let should_disconnect = if let Some(mut peer) = self.peers.get_mut(peer_id) { - // we add reputation since negative reputation change decrease total reputation - peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32()); - trace!(target: "net::peers", repuation=%peer.reputation, banned=%peer.is_banned(), "applied reputation change"); - let should_disconnect = peer.state.is_connected() && peer.is_banned(); - - if should_disconnect { - debug!(target: "net::peers", repuation=%peer.reputation, "disconnecting peer on reputation change"); - peer.state.disconnect(); - } - - should_disconnect + let outcome = if let Some(peer) = self.peers.get_mut(peer_id) { + peer.apply_reputation(reputation_change.as_i32()) } else { - false + return }; - if should_disconnect { - // start the disconnect process - self.queued_actions - .push_back(PeerAction::Disconnect { peer_id: *peer_id, reason: None }) + match outcome { + ReputationChangeOutcome::None => {} + ReputationChangeOutcome::Ban => { + self.ban_peer(*peer_id); + } + ReputationChangeOutcome::Unban => self.unban_peer(*peer_id), + ReputationChangeOutcome::DisconnectAndBan => { + self.queued_actions.push_back(PeerAction::Disconnect { + peer_id: *peer_id, + reason: Some(DisconnectReason::DisconnectRequested), + }); + self.ban_peer(*peer_id); + } } } @@ -375,6 +405,17 @@ impl PeersManager { self.fill_outbound_slots(); } + if self.unban_interval.poll_tick(cx).is_ready() { + for peer_id in self.ban_list.evict_peers(std::time::Instant::now()) { + if let Some(peer) = self.peers.get_mut(&peer_id) { + peer.unban(); + } else { + continue + } + self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id }); + } + } + if self.queued_actions.is_empty() { return Poll::Pending } @@ -382,6 +423,12 @@ impl PeersManager { } } +impl Default for PeersManager { + fn default() -> Self { + PeersManager::new(Default::default()) + } +} + /// Tracks stats about connected nodes #[derive(Debug)] pub struct ConnectionInfo { @@ -439,8 +486,8 @@ impl Default for ConnectionInfo { } } -#[derive(Debug, Clone)] /// Tracks info about a single peer. +#[derive(Debug, Clone)] pub struct Peer { /// Where to reach the peer addr: SocketAddr, @@ -460,18 +507,60 @@ impl Peer { } fn with_state(addr: SocketAddr, state: PeerConnectionState) -> Self { - Self { addr, state, reputation: 0, fork_id: None } + Self { addr, state, reputation: DEFAULT_REPUTATION, fork_id: None } + } + + /// Applies a reputation change to the peer and returns what action should be taken. + fn apply_reputation(&mut self, reputation: i32) -> ReputationChangeOutcome { + let previous = self.reputation; + // we add reputation since negative reputation change decrease total reputation + self.reputation = previous.saturating_add(reputation); + + trace!(target: "net::peers", repuation=%self.reputation, banned=%self.is_banned(), "applied reputation change"); + + if self.state.is_connected() && self.is_banned() { + self.state.disconnect(); + return ReputationChangeOutcome::DisconnectAndBan + } + + if self.is_banned() && !is_banned_reputation(previous) { + return ReputationChangeOutcome::Ban + } + + if !self.is_banned() && is_banned_reputation(previous) { + return ReputationChangeOutcome::Unban + } + + ReputationChangeOutcome::None } /// Returns true if the peer's reputation is below the banned threshold. #[inline] fn is_banned(&self) -> bool { - self.reputation < BANNED_REPUTATION + is_banned_reputation(self.reputation) + } + + /// Unbans the peer by resetting its reputation + #[inline] + fn unban(&mut self) { + self.reputation = DEFAULT_REPUTATION } } +/// Outcomes when a reputation change is applied to a peer +enum ReputationChangeOutcome { + /// Nothing to do. + None, + /// Ban the peer. + Ban, + /// Ban and disconnect + DisconnectAndBan, + /// Unban the peer + Unban, +} + /// Represents the kind of connection established to the peer, if any -#[derive(Debug, Clone, Copy, Default)] +#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)] enum PeerConnectionState { /// Not connected currently. #[default] @@ -546,19 +635,25 @@ pub enum PeerAction { }, /// Ban the peer in discovery. DiscoveryBan { peer_id: PeerId, ip_addr: IpAddr }, + /// Ban the peer temporarily + BanPeer { peer_id: PeerId }, + /// Unban the peer temporarily + UnBanPeer { peer_id: PeerId }, } /// Config type for initiating a [`PeersManager`] instance #[derive(Debug)] pub struct PeersConfig { - /// How often to recheck free slots for outbound connections + /// How often to recheck free slots for outbound connections. pub refill_slots_interval: Duration, - /// Restrictions on connections + /// Restrictions on connections. pub connection_info: ConnectionInfo, - /// How to weigh reputation changes + /// How to weigh reputation changes. pub reputation_weights: ReputationChangeWeights, - /// Restrictions on PeerIds and Ips + /// Restrictions on PeerIds and Ips. pub ban_list: BanList, + /// How long to ban bad peers. + pub ban_duration: Duration, } impl Default for PeersConfig { @@ -568,6 +663,8 @@ impl Default for PeersConfig { connection_info: Default::default(), reputation_weights: Default::default(), ban_list: Default::default(), + // Ban peers for 12h + ban_duration: Duration::from_secs(60 * 60 * 12), } } } @@ -626,7 +723,7 @@ mod test { use crate::{ peers::{ manager::{ConnectionInfo, PeerConnectionState}, - PeerAction, + PeerAction, ReputationChangeKind, }, PeersConfig, }; @@ -634,9 +731,143 @@ mod test { use reth_primitives::{PeerId, H512}; use std::{ collections::HashSet, + future::{poll_fn, Future}, net::{IpAddr, Ipv4Addr, SocketAddr}, + pin::Pin, + task::{Context, Poll}, }; + struct PeerActionFuture<'a> { + peers: &'a mut PeersManager, + } + + impl<'a> Future for PeerActionFuture<'a> { + type Output = PeerAction; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.get_mut().peers.poll(cx) + } + } + + macro_rules! event { + ($peers:expr) => { + PeerActionFuture { peers: &mut $peers }.await + }; + } + + #[tokio::test] + async fn test_insert() { + let peer = PeerId::random(); + let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); + let mut peers = PeersManager::default(); + peers.add_discovered_node(peer, socket_addr); + + match event!(peers) { + PeerAction::Connect { peer_id, remote_addr } => { + assert_eq!(peer_id, peer); + assert_eq!(remote_addr, socket_addr); + } + _ => unreachable!(), + } + } + + #[tokio::test] + async fn test_ban() { + let peer = PeerId::random(); + let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); + let mut peers = PeersManager::default(); + peers.ban_peer(peer); + peers.add_discovered_node(peer, socket_addr); + + match event!(peers) { + PeerAction::BanPeer { peer_id } => { + assert_eq!(peer_id, peer); + } + _ => unreachable!(), + } + + poll_fn(|cx| { + assert!(peers.poll(cx).is_pending()); + Poll::Ready(()) + }) + .await; + } + + #[tokio::test] + async fn test_reputation_change() { + let peer = PeerId::random(); + let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); + let mut peers = PeersManager::default(); + peers.add_discovered_node(peer, socket_addr); + + match event!(peers) { + PeerAction::Connect { peer_id, remote_addr } => { + assert_eq!(peer_id, peer); + assert_eq!(remote_addr, socket_addr); + } + _ => unreachable!(), + } + + peers.apply_reputation_change(&peer, ReputationChangeKind::BadProtocol); + + let p = peers.peers.get(&peer).unwrap(); + assert!(p.is_banned()); + + match event!(peers) { + PeerAction::Disconnect { peer_id, .. } => { + assert_eq!(peer_id, peer); + } + _ => { + unreachable!() + } + } + + match event!(peers) { + PeerAction::BanPeer { peer_id } => { + assert_eq!(peer_id, peer); + } + _ => unreachable!(), + } + } + + #[tokio::test] + async fn test_reputation_change_connected() { + let peer = PeerId::random(); + let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); + let mut peers = PeersManager::default(); + peers.add_discovered_node(peer, socket_addr); + + match event!(peers) { + PeerAction::Connect { peer_id, remote_addr } => { + assert_eq!(peer_id, peer); + assert_eq!(remote_addr, socket_addr); + } + _ => unreachable!(), + } + + let p = peers.peers.get_mut(&peer).unwrap(); + assert_eq!(p.state, PeerConnectionState::Out); + + peers.apply_reputation_change(&peer, ReputationChangeKind::BadProtocol); + + let p = peers.peers.get(&peer).unwrap(); + assert_eq!(p.state, PeerConnectionState::DisconnectingOut); + assert!(p.is_banned()); + + peers.on_disconnected(&peer); + + let p = peers.peers.get(&peer).unwrap(); + assert_eq!(p.state, PeerConnectionState::Idle); + assert!(p.is_banned()); + + match event!(peers) { + PeerAction::Disconnect { peer_id, .. } => { + assert_eq!(peer_id, peer); + } + _ => unreachable!(), + } + } + #[tokio::test] async fn test_discovery_ban_list() { let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)); diff --git a/crates/net/network/src/peers/reputation.rs b/crates/net/network/src/peers/reputation.rs index 9bfb93a98b..97493762e3 100644 --- a/crates/net/network/src/peers/reputation.rs +++ b/crates/net/network/src/peers/reputation.rs @@ -3,6 +3,9 @@ /// The type that tracks the reputation score. pub(crate) type Reputation = i32; +/// The default reputation of a peer +pub(crate) const DEFAULT_REPUTATION: Reputation = 0; + /// The minimal unit we're measuring reputation const REPUTATION_UNIT: i32 = -1024; @@ -24,6 +27,12 @@ const BAD_MESSAGE_REPUTATION_CHANGE: i32 = 8 * REPUTATION_UNIT; /// The reputation change to apply to a peer which violates protocol rules: minimal reputation const BAD_PROTOCOL_REPUTATION_CHANGE: i32 = i32::MIN; +/// Returns `true` if the given reputation is below the [`BANNED_REPUTATION`] threshold +#[inline] +pub(crate) fn is_banned_reputation(reputation: i32) -> bool { + reputation < BANNED_REPUTATION +} + /// Various kinds of reputation changes. #[derive(Debug, Copy, Clone)] pub enum ReputationChangeKind { diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index a516342672..615182589b 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -285,6 +285,8 @@ where self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason: None }); } PeerAction::DiscoveryBan { peer_id, ip_addr } => self.ban_discovery(peer_id, ip_addr), + PeerAction::BanPeer { .. } => {} + PeerAction::UnBanPeer { .. } => {} } }