From 139efee5999689304ffa756f8efe9917b141ec87 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Sat, 12 Nov 2022 08:41:15 +0100 Subject: [PATCH] feat(net): impl peer management (#194) --- crates/net/network/src/peers.rs | 270 ++++++++++++++++++++++++++++---- crates/net/network/src/state.rs | 16 +- 2 files changed, 251 insertions(+), 35 deletions(-) diff --git a/crates/net/network/src/peers.rs b/crates/net/network/src/peers.rs index 44fa6ad50d..d11954844c 100644 --- a/crates/net/network/src/peers.rs +++ b/crates/net/network/src/peers.rs @@ -1,6 +1,5 @@ -use reth_discv4::NodeId; - use futures::StreamExt; +use reth_discv4::NodeId; use std::{ collections::{hash_map::Entry, HashMap, VecDeque}, net::SocketAddr, @@ -13,8 +12,15 @@ use tokio::{ }; use tokio_stream::wrappers::UnboundedReceiverStream; +/// The reputation value below which new connection from/to peers are rejected. +pub const BANNED_REPUTATION: i32 = 0; + +/// The reputation change to apply to a node that dropped the connection. +const REMOTE_DISCONNECT_REPUTATION_CHANGE: i32 = -100; + /// A communication channel to the [`PeersManager`] to apply changes to the peer set. pub struct PeersHandle { + /// Sender half of command channel back to the [`PeersManager`] manager_tx: mpsc::UnboundedSender, } @@ -26,13 +32,13 @@ pub struct PeersHandle { /// The [`PeersManager`] will be notified on peer related changes pub(crate) struct PeersManager { /// All peers known to the network - peers: HashMap, + peers: HashMap, /// Copy of the receiver half, so new [`PeersHandle`] can be created on demand. manager_tx: mpsc::UnboundedSender, /// Receiver half of the command channel. handle_rx: UnboundedReceiverStream, /// Buffered actions until the manager is polled. - actions: VecDeque, + queued_actions: VecDeque, /// Interval for triggering connections if there are free slots. refill_slots_interval: Interval, /// Tracks current slot stats. @@ -48,7 +54,7 @@ impl PeersManager { peers: Default::default(), manager_tx, handle_rx: UnboundedReceiverStream::new(handle_rx), - actions: Default::default(), + queued_actions: Default::default(), refill_slots_interval: tokio::time::interval_at( Instant::now() + refill_slots_interval, refill_slots_interval, @@ -57,27 +63,116 @@ impl PeersManager { } } - /// Returns a new [`PeersHandle`] that can send commands to this type + /// Returns a new [`PeersHandle`] that can send commands to this type. pub(crate) fn handle(&self) -> PeersHandle { PeersHandle { manager_tx: self.manager_tx.clone() } } - pub(crate) fn add_discovered_node(&mut self, node: NodeId, addr: SocketAddr) { - match self.peers.entry(node) { - Entry::Occupied(_) => {} + /// Called when a new _incoming_ active session was established to the given peer. + /// + /// This will update the state of the peer if not yet tracked. + /// + /// If the reputation of the peer is below the `BANNED_REPUTATION` threshold, a disconnect will + /// be scheduled. + pub(crate) fn on_active_session(&mut self, peer_id: NodeId, addr: SocketAddr) { + match self.peers.entry(peer_id) { + Entry::Occupied(mut entry) => { + let value = entry.get_mut(); + if value.is_banned() { + self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id }); + return + } + value.state = PeerConnectionState::In; + } Entry::Vacant(entry) => { - entry.insert(Node::new(addr)); + entry.insert(Peer::with_state(addr, PeerConnectionState::In)); + } + } + + // keep track of new connection + self.connection_info.inc_in(); + } + + /// Called when a session to a peer was disconnected. + /// + /// Accepts an additional [`ReputationChange`] value to apply to the peer. + pub(crate) fn on_disconnected(&mut self, peer: NodeId, reputation_change: ReputationChange) { + if let Some(mut peer) = self.peers.get_mut(&peer) { + self.connection_info.decr_state(peer.state); + peer.state = PeerConnectionState::Idle; + peer.reputation -= reputation_change.0; + } + } + + /// Called for a newly discovered peer. + /// + /// If the peer already exists, then the address will e updated. If the addresses differ, the + /// old address is returned + pub(crate) fn add_discovered_node(&mut self, peer_id: NodeId, addr: SocketAddr) { + match self.peers.entry(peer_id) { + Entry::Occupied(mut entry) => { + let node = entry.get_mut(); + node.addr = addr; + } + Entry::Vacant(entry) => { + entry.insert(Peer::new(addr)); } } } - pub(crate) fn remove_discovered_node(&mut self, _node: NodeId) {} + /// Removes the tracked node from the set. + pub(crate) fn remove_discovered_node(&mut self, peer_id: NodeId) { + if let Some(entry) = self.peers.remove(&peer_id) { + if entry.state.is_connected() { + self.connection_info.decr_state(entry.state); + self.queued_actions.push_back(PeerAction::Disconnect { peer_id }) + } + } + } + + /// Returns the idle peer with the highest reputation. + /// + /// Returns `None` if no peer is available. + fn best_unconnected(&mut self) -> Option<(NodeId, &mut Peer)> { + self.peers + .iter_mut() + .filter(|(_, peer)| peer.state.is_unconnected()) + .fold(None::<(&NodeId, &mut Peer)>, |mut best_peer, candidate| { + if let Some(best_peer) = best_peer.take() { + if best_peer.1.reputation >= candidate.1.reputation { + return Some(best_peer) + } + } + Some(candidate) + }) + .map(|(id, peer)| (*id, peer)) + } /// If there's capacity for new outbound connections, this will queue new /// [`PeerAction::Connect`] actions. + /// + /// New connections are only initiated, if slots are available and appropriate peers are + /// available. fn fill_outbound_slots(&mut self) { - // This checks if there are free slots for new outbound connections available that can be - // filled + // as long as there a slots available try to fill them with the best peers + while self.connection_info.has_out_capacity() { + let action = { + let (peer_id, peer) = match self.best_unconnected() { + Some(peer) => peer, + _ => break, + }; + + // If best peer does not meet reputation threshold exit immediately. + if peer.is_banned() { + break + } + peer.state = PeerConnectionState::Out; + PeerAction::Connect { peer_id, remote_addr: peer.addr } + }; + + self.connection_info.inc_out(); + self.queued_actions.push_back(action); + } } /// Advances the state. @@ -87,15 +182,25 @@ impl PeersManager { pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll { loop { // drain buffered actions - if let Some(action) = self.actions.pop_front() { + if let Some(action) = self.queued_actions.pop_front() { return Poll::Ready(action) } + + while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) { + match cmd { + PeerCommand::Add { peer_id, addr } => { + self.add_discovered_node(peer_id, addr); + } + PeerCommand::Remove(peer) => self.remove_discovered_node(peer), + } + } + if self.refill_slots_interval.poll_tick(cx).is_ready() { self.fill_outbound_slots(); } - while let Poll::Ready(Some(_cmd)) = self.handle_rx.poll_next_unpin(cx) { - // TODO handle incoming command + if self.queued_actions.is_empty() { + return Poll::Pending } } } @@ -114,27 +219,126 @@ pub struct ConnectionInfo { max_inbound: usize, } -/// Tracks info about a single node. -struct Node { - /// Where to reach the node - addr: SocketAddr, - /// Reputation of the node. - reputation: i32, +// === impl ConnectionInfo === + +impl ConnectionInfo { + /// Returns `true` if there's still capacity for a new outgoing connection. + fn has_out_capacity(&self) -> bool { + self.num_outbound < self.max_outbound + } + + fn decr_state(&mut self, state: PeerConnectionState) { + match state { + PeerConnectionState::Idle => {} + PeerConnectionState::In => self.decr_in(), + PeerConnectionState::Out => self.decr_out(), + } + } + + fn decr_out(&mut self) { + self.num_outbound -= 1; + } + + fn inc_out(&mut self) { + self.num_outbound += 1; + } + + fn inc_in(&mut self) { + self.num_inbound += 1; + } + + fn decr_in(&mut self) { + self.num_inbound -= 1; + } } -// === impl Node === +/// Tracks info about a single peer. +struct Peer { + /// Where to reach the peer + addr: SocketAddr, + /// Reputation of the peer. + reputation: i32, + /// The state of the connection, if any. + state: PeerConnectionState, +} -impl Node { +// === impl Peer === + +impl Peer { fn new(addr: SocketAddr) -> Self { - Self { addr, reputation: 0 } + Self::with_state(addr, Default::default()) + } + + fn with_state(addr: SocketAddr, state: PeerConnectionState) -> Self { + Self { addr, state, reputation: 0 } + } + + /// Returns true if the peer's reputation is below the banned threshold. + #[inline] + fn is_banned(&self) -> bool { + self.reputation < BANNED_REPUTATION + } +} + +/// Represents the kind of connection established to the peer, if any +#[derive(Debug, Clone, Copy, Default)] +enum PeerConnectionState { + /// Not connected currently. + #[default] + Idle, + /// Connected via incoming connection. + In, + /// Connected via outgoing connection. + Out, +} + +// === impl PeerConnectionState === + +impl PeerConnectionState { + /// Returns whether we're currently connected with this peer + #[inline] + fn is_connected(&self) -> bool { + matches!(self, PeerConnectionState::In | PeerConnectionState::Out) + } + + /// Returns if there's currently no connection to that peer. + #[inline] + fn is_unconnected(&self) -> bool { + matches!(self, PeerConnectionState::Idle) + } +} + +/// Represents a change in a peer's reputation. +#[derive(Debug, Copy, Clone, Default)] +pub(crate) struct ReputationChange(i32); + +// === impl ReputationChange === + +impl ReputationChange { + /// Apply no reputation change. + pub(crate) const fn none() -> Self { + Self(0) + } + + /// Reputation change for a peer that dropped the connection. + pub(crate) const fn dropped() -> Self { + Self(REMOTE_DISCONNECT_REPUTATION_CHANGE) } } /// Commands the [`PeersManager`] listens for. -pub enum PeerCommand { - Add(NodeId), +pub(crate) enum PeerCommand { + /// Command for manually add + Add { + /// Identifier of the peer. + peer_id: NodeId, + /// The address of the peer + addr: SocketAddr, + }, + /// Remove a peer from the set + /// + /// If currently connected this will disconnect the sessin Remove(NodeId), - // TODO reputation change } /// Actions the peer manager can trigger. @@ -143,12 +347,18 @@ pub enum PeerAction { /// Start a new connection to a peer. Connect { /// The peer to connect to. - node_id: NodeId, + peer_id: NodeId, /// Where to reach the node remote_addr: SocketAddr, }, /// Disconnect an existing connection. - Disconnect { node_id: NodeId }, + Disconnect { peer_id: NodeId }, + /// Disconnect an existing incoming connection, because the peers reputation is below the + /// banned threshold. + DisconnectBannedIncoming { + /// Peer id of the established connection. + peer_id: NodeId, + }, } /// Config type for initiating a [`PeersManager`] instance diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index c8f7870727..30e0eac43c 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -132,12 +132,18 @@ where /// Event hook for new actions derived from the peer management set. fn on_peer_action(&mut self, action: PeerAction) { match action { - PeerAction::Connect { node_id, remote_addr } => { - self.queued_messages.push_back(StateAction::Connect { node_id, remote_addr }); + PeerAction::Connect { peer_id, remote_addr } => { + self.queued_messages + .push_back(StateAction::Connect { node_id: peer_id, remote_addr }); } - PeerAction::Disconnect { node_id } => { - self.state_fetcher.on_pending_disconnect(&node_id); - self.queued_messages.push_back(StateAction::Disconnect { node_id }); + PeerAction::Disconnect { peer_id } => { + self.state_fetcher.on_pending_disconnect(&peer_id); + self.queued_messages.push_back(StateAction::Disconnect { node_id: peer_id }); + } + PeerAction::DisconnectBannedIncoming { peer_id } => { + // TODO: can IP ban + self.state_fetcher.on_pending_disconnect(&peer_id); + self.queued_messages.push_back(StateAction::Disconnect { node_id: peer_id }); } } }