From 8966daad9d029e97a77918ba6a8a211caf7f33d3 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Thu, 24 Nov 2022 10:14:50 +0100 Subject: [PATCH] feat(net): reputation system and peer reporting (#239) * refactor: move to module * feat(net): reputation system and peer reporting * be specific in test * use saturating sub --- crates/interfaces/src/p2p/error.rs | 2 + crates/net/network/src/manager.rs | 57 +++++++- crates/net/network/src/message.rs | 18 ++- crates/net/network/src/network.rs | 16 ++- .../src/{peers.rs => peers/manager.rs} | 118 +++++++++------ crates/net/network/src/peers/mod.rs | 9 ++ crates/net/network/src/peers/reputation.rs | 136 ++++++++++++++++++ crates/net/network/src/session/active.rs | 26 ++-- crates/net/network/src/session/handle.rs | 13 +- crates/net/network/src/session/mod.rs | 56 +++++++- crates/net/network/src/state.rs | 5 + crates/net/network/src/swarm.rs | 21 ++- crates/net/network/src/transactions.rs | 28 ++-- crates/net/network/tests/it/connect.rs | 6 +- 14 files changed, 424 insertions(+), 87 deletions(-) rename crates/net/network/src/{peers.rs => peers/manager.rs} (77%) create mode 100644 crates/net/network/src/peers/mod.rs create mode 100644 crates/net/network/src/peers/reputation.rs diff --git a/crates/interfaces/src/p2p/error.rs b/crates/interfaces/src/p2p/error.rs index d7debf954f..86087d29d2 100644 --- a/crates/interfaces/src/p2p/error.rs +++ b/crates/interfaces/src/p2p/error.rs @@ -17,6 +17,8 @@ pub enum RequestError { UnsupportedCapability, #[error("Request timed out while awaiting response.")] Timeout, + #[error("Received bad response.")] + BadResponse, } impl From> for RequestError { diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 23858cf2e0..9d7bd18c8b 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -23,7 +23,7 @@ use crate::{ listener::ConnectionListener, message::{NewBlockMessage, PeerMessage, PeerRequest, PeerRequestSender}, network::{NetworkHandle, NetworkHandleMessage}, - peers::PeersManager, + peers::{PeersManager, ReputationChangeKind}, session::SessionManager, state::NetworkState, swarm::{Swarm, SwarmEvent}, @@ -188,13 +188,16 @@ where /// Event hook for an unexpected message from the peer. fn on_invalid_message( - &self, + &mut self, peer_id: PeerId, _capabilities: Arc, _message: CapabilityMessage, ) { trace!(target : "net", ?peer_id, "received unexpected message"); - // TODO: disconnect? + self.swarm + .state_mut() + .peers_mut() + .apply_reputation_change(&peer_id, ReputationChangeKind::BadProtocol); } /// Sends an event to the [`TransactionsManager`] if configured @@ -235,7 +238,10 @@ where } }, Err(_err) => { - // TODO report peer for bad block + self.swarm + .state_mut() + .peers_mut() + .apply_reputation_change(&peer, ReputationChangeKind::BadBlock); } } } @@ -326,6 +332,9 @@ where NetworkHandleMessage::DisconnectPeer(peer_id) => { self.swarm.sessions_mut().disconnect(peer_id, None); } + NetworkHandleMessage::ReputationChange(peer_id, kind) => { + self.swarm.state_mut().peers_mut().apply_reputation_change(&peer_id, kind); + } } } } @@ -380,7 +389,13 @@ where SwarmEvent::OutgoingTcpConnection { remote_addr } => { trace!(target : "net", ?remote_addr,"Starting outbound connection."); } - SwarmEvent::SessionEstablished { peer_id, remote_addr, capabilities, messages } => { + SwarmEvent::SessionEstablished { + peer_id, + remote_addr, + capabilities, + messages, + direction, + } => { let total_active = this.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1; trace!( target : "net", @@ -390,6 +405,10 @@ where "Session established" ); + if direction.is_incoming() { + this.swarm.state_mut().peers_mut().on_active_session(peer_id, remote_addr); + } + this.event_listeners.send(NetworkEvent::SessionEstablished { peer_id, capabilities, @@ -407,11 +426,35 @@ where "Session disconnected" ); + if error.is_some() { + // If the connection was closed due to an error, we report the peer + this.swarm.state_mut().peers_mut().on_connection_dropped(&peer_id); + } else { + // Gracefully disconnected + this.swarm.state_mut().peers_mut().on_disconnected(&peer_id); + } + this.event_listeners.send(NetworkEvent::SessionClosed { peer_id }); } SwarmEvent::IncomingPendingSessionClosed { .. } => {} - SwarmEvent::OutgoingPendingSessionClosed { .. } => {} - SwarmEvent::OutgoingConnectionError { .. } => {} + SwarmEvent::OutgoingPendingSessionClosed { peer_id, .. } => { + this.swarm + .state_mut() + .peers_mut() + .apply_reputation_change(&peer_id, ReputationChangeKind::FailedToConnect); + } + SwarmEvent::OutgoingConnectionError { peer_id, .. } => { + this.swarm + .state_mut() + .peers_mut() + .apply_reputation_change(&peer_id, ReputationChangeKind::FailedToConnect); + } + SwarmEvent::BadMessage { peer_id } => { + this.swarm + .state_mut() + .peers_mut() + .apply_reputation_change(&peer_id, ReputationChangeKind::FailedToConnect); + } } } diff --git a/crates/net/network/src/message.rs b/crates/net/network/src/message.rs index e659b8054e..c97238421d 100644 --- a/crates/net/network/src/message.rs +++ b/crates/net/network/src/message.rs @@ -10,7 +10,7 @@ use reth_eth_wire::{ NewBlock, NewBlockHashes, NewPooledTransactionHashes, NodeData, PooledTransactions, Receipts, Transactions, }; -use reth_interfaces::p2p::error::RequestResult; +use reth_interfaces::p2p::error::{RequestError, RequestResult}; use reth_primitives::{Header, PeerId, Receipt, TransactionSigned, H256}; use std::{ sync::Arc, @@ -126,6 +126,22 @@ pub enum PeerRequest { // === impl PeerRequest === impl PeerRequest { + /// Invoked if we received a response which does not match the request + pub(crate) fn send_bad_response(self) { + self.send_err_response(RequestError::BadResponse) + } + + /// Send an error back to the receiver. + pub(crate) fn send_err_response(self, err: RequestError) { + let _ = match self { + PeerRequest::GetBlockHeaders { response, .. } => response.send(Err(err)).ok(), + PeerRequest::GetBlockBodies { response, .. } => response.send(Err(err)).ok(), + PeerRequest::GetPooledTransactions { response, .. } => response.send(Err(err)).ok(), + PeerRequest::GetNodeData { response, .. } => response.send(Err(err)).ok(), + PeerRequest::GetReceipts { response, .. } => response.send(Err(err)).ok(), + }; + } + /// Returns the [`EthMessage`] for this type pub fn create_request_message(&self, request_id: u64) -> EthMessage { match self { diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index e5487f1940..236223a2b2 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -1,4 +1,9 @@ -use crate::{config::NetworkMode, manager::NetworkEvent, message::PeerRequest, peers::PeersHandle}; +use crate::{ + config::NetworkMode, + manager::NetworkEvent, + message::PeerRequest, + peers::{PeersHandle, ReputationChangeKind}, +}; use parking_lot::Mutex; use reth_eth_wire::{NewBlock, NewPooledTransactionHashes, Transactions}; use reth_primitives::{PeerId, H256}; @@ -88,7 +93,12 @@ impl NetworkHandle { /// Sends a message to the [`NetworkManager`] to disconnect an existing connection to the given /// peer. pub fn disconnect_peer(&self, peer: PeerId) { - let _ = self.inner.to_manager_tx.send(NetworkHandleMessage::DisconnectPeer(peer)); + self.send_message(NetworkHandleMessage::DisconnectPeer(peer)) + } + + /// Send a reputation change for the given peer. + pub fn reputation_change(&self, peer_id: PeerId, kind: ReputationChangeKind) { + self.send_message(NetworkHandleMessage::ReputationChange(peer_id, kind)); } /// Sends a [`PeerRequest`] to the given peer's session. @@ -134,4 +144,6 @@ pub(crate) enum NetworkHandleMessage { /// The request to send to the peer's sessions. request: PeerRequest, }, + /// Apply a reputation change to the given peer. + ReputationChange(PeerId, ReputationChangeKind), } diff --git a/crates/net/network/src/peers.rs b/crates/net/network/src/peers/manager.rs similarity index 77% rename from crates/net/network/src/peers.rs rename to crates/net/network/src/peers/manager.rs index 607686fbee..34d5199a90 100644 --- a/crates/net/network/src/peers.rs +++ b/crates/net/network/src/peers/manager.rs @@ -1,3 +1,4 @@ +use crate::peers::{reputation::BANNED_REPUTATION, ReputationChangeKind, ReputationChangeWeights}; use futures::StreamExt; use reth_eth_wire::DisconnectReason; use reth_primitives::PeerId; @@ -13,18 +14,21 @@ use tokio::{ }; use tokio_stream::wrappers::UnboundedReceiverStream; -/// The reputation value below which new connection from/to peers are rejected. -pub const BANNED_REPUTATION: i32 = 0; - -/// The reputation change to apply to a node that dropped the connection. -const REMOTE_DISCONNECT_REPUTATION_CHANGE: i32 = -100; - /// A communication channel to the [`PeersManager`] to apply changes to the peer set. pub struct PeersHandle { /// Sender half of command channel back to the [`PeersManager`] manager_tx: mpsc::UnboundedSender, } +// === impl PeersHandle === + +impl PeersHandle { + /// Send a reputation change for the given peer + pub fn reputation_change(&self, peer_id: PeerId, kind: ReputationChangeKind) { + let _ = self.manager_tx.send(PeerCommand::ReputationChange(peer_id, kind)); + } +} + /// Maintains the state of _all_ the peers known to the network. /// /// This is supposed to be owned by the network itself, but can be reached via the [`PeersHandle`]. @@ -42,6 +46,8 @@ pub(crate) struct PeersManager { queued_actions: VecDeque, /// Interval for triggering connections if there are free slots. refill_slots_interval: Interval, + /// How to weigh reputation changes + reputation_weights: ReputationChangeWeights, /// Tracks current slot stats. connection_info: ConnectionInfo, } @@ -49,13 +55,14 @@ pub(crate) struct PeersManager { impl PeersManager { /// Create a new instance with the given config pub(crate) fn new(config: PeersConfig) -> Self { - let PeersConfig { refill_slots_interval, connection_info } = config; + let PeersConfig { refill_slots_interval, connection_info, reputation_weights } = config; let (manager_tx, handle_rx) = mpsc::unbounded_channel(); Self { peers: Default::default(), manager_tx, handle_rx: UnboundedReceiverStream::new(handle_rx), queued_actions: Default::default(), + reputation_weights, refill_slots_interval: tokio::time::interval_at( Instant::now() + refill_slots_interval, refill_slots_interval, @@ -94,14 +101,44 @@ impl PeersManager { self.connection_info.inc_in(); } - /// Called when a session to a peer was disconnected. - /// - /// Accepts an additional [`ReputationChange`] value to apply to the peer. - pub(crate) fn on_disconnected(&mut self, peer: PeerId, reputation_change: ReputationChange) { - if let Some(mut peer) = self.peers.get_mut(&peer) { + /// Apply the corresponding reputation change to the given peer + pub(crate) fn apply_reputation_change(&mut self, peer_id: &PeerId, rep: ReputationChangeKind) { + let reputation_change = self.reputation_weights.change(rep); + let should_disconnect = if let Some(mut peer) = self.peers.get_mut(peer_id) { + peer.reputation = peer.reputation.saturating_sub(reputation_change.as_i32()); + let should_disconnect = peer.state.is_connected() && peer.is_banned(); + + if should_disconnect { + peer.state.disconnect(); + } + + should_disconnect + } else { + false + }; + + if should_disconnect { + // start the disconnect process + self.queued_actions + .push_back(PeerAction::Disconnect { peer_id: *peer_id, reason: None }) + } + } + + /// Gracefully disconnected + pub(crate) fn on_disconnected(&mut self, peer_id: &PeerId) { + if let Some(mut peer) = self.peers.get_mut(peer_id) { self.connection_info.decr_state(peer.state); peer.state = PeerConnectionState::Idle; - peer.reputation -= reputation_change.0; + } + } + + /// Called when a session to a peer was forcefully disconnected. + pub(crate) fn on_connection_dropped(&mut self, peer_id: &PeerId) { + let reputation_change = self.reputation_weights.change(ReputationChangeKind::Dropped); + if let Some(mut peer) = self.peers.get_mut(peer_id) { + self.connection_info.decr_state(peer.state); + peer.state = PeerConnectionState::Idle; + peer.reputation = peer.reputation.saturating_sub(reputation_change.as_i32()); } } @@ -190,10 +227,13 @@ impl PeersManager { while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) { match cmd { - PeerCommand::Add { peer_id, addr } => { + PeerCommand::Add(peer_id, addr) => { self.add_discovered_node(peer_id, addr); } PeerCommand::Remove(peer) => self.remove_discovered_node(peer), + PeerCommand::ReputationChange(peer_id, rep) => { + self.apply_reputation_change(&peer_id, rep) + } } } @@ -232,8 +272,8 @@ impl ConnectionInfo { fn decr_state(&mut self, state: PeerConnectionState) { match state { PeerConnectionState::Idle => {} - PeerConnectionState::In => self.decr_in(), - PeerConnectionState::Out => self.decr_out(), + PeerConnectionState::DisconnectingIn | PeerConnectionState::In => self.decr_in(), + PeerConnectionState::DisconnectingOut | PeerConnectionState::Out => self.decr_out(), } } @@ -288,6 +328,10 @@ enum PeerConnectionState { /// Not connected currently. #[default] Idle, + /// Disconnect of an incoming connection in progress + DisconnectingIn, + /// Disconnect of an outgoing connection in progress + DisconnectingOut, /// Connected via incoming connection. In, /// Connected via outgoing connection. @@ -297,6 +341,16 @@ enum PeerConnectionState { // === impl PeerConnectionState === impl PeerConnectionState { + /// Sets the disconnect state + #[inline] + fn disconnect(&mut self) { + match self { + PeerConnectionState::In => *self = PeerConnectionState::DisconnectingIn, + PeerConnectionState::Out => *self = PeerConnectionState::DisconnectingOut, + _ => {} + } + } + /// Returns whether we're currently connected with this peer #[inline] fn is_connected(&self) -> bool { @@ -310,37 +364,16 @@ impl PeerConnectionState { } } -/// Represents a change in a peer's reputation. -#[derive(Debug, Copy, Clone, Default)] -pub(crate) struct ReputationChange(i32); - -// === impl ReputationChange === - -impl ReputationChange { - /// Apply no reputation change. - pub(crate) const fn none() -> Self { - Self(0) - } - - /// Reputation change for a peer that dropped the connection. - pub(crate) const fn dropped() -> Self { - Self(REMOTE_DISCONNECT_REPUTATION_CHANGE) - } -} - /// Commands the [`PeersManager`] listens for. pub(crate) enum PeerCommand { /// Command for manually add - Add { - /// Identifier of the peer. - peer_id: PeerId, - /// The address of the peer - addr: SocketAddr, - }, + Add(PeerId, SocketAddr), /// Remove a peer from the set /// - /// If currently connected this will disconnect the sessin + /// If currently connected this will disconnect the session Remove(PeerId), + /// Apply a reputation change to the given peer. + ReputationChange(PeerId, ReputationChangeKind), } /// Actions the peer manager can trigger. @@ -370,6 +403,8 @@ pub struct PeersConfig { pub refill_slots_interval: Duration, /// Restrictions on connections pub connection_info: ConnectionInfo, + /// How to weigh reputation changes + pub reputation_weights: ReputationChangeWeights, } impl Default for PeersConfig { @@ -382,6 +417,7 @@ impl Default for PeersConfig { max_outbound: 70, max_inbound: 30, }, + reputation_weights: Default::default(), } } } diff --git a/crates/net/network/src/peers/mod.rs b/crates/net/network/src/peers/mod.rs new file mode 100644 index 0000000000..fe28f6c62a --- /dev/null +++ b/crates/net/network/src/peers/mod.rs @@ -0,0 +1,9 @@ +//! Peer related implementations + +mod manager; +mod reputation; + +pub(crate) use manager::{PeerAction, PeersManager}; +pub use manager::{PeersConfig, PeersHandle}; +pub(crate) use reputation::ReputationChange; +pub use reputation::{ReputationChangeKind, ReputationChangeWeights}; diff --git a/crates/net/network/src/peers/reputation.rs b/crates/net/network/src/peers/reputation.rs new file mode 100644 index 0000000000..ac1d0d6e83 --- /dev/null +++ b/crates/net/network/src/peers/reputation.rs @@ -0,0 +1,136 @@ +//! Peer reputation management + +/// The type that tracks the reputation score. +pub(crate) type Reputation = i32; + +/// The minimal unit we're measuring reputation +const REPUTATION_UNIT: i32 = -1024; + +/// The reputation value below which new connection from/to peers are rejected. +pub(crate) const BANNED_REPUTATION: i32 = 100 * REPUTATION_UNIT; + +/// The reputation change to apply to a peer that dropped the connection. +const REMOTE_DISCONNECT_REPUTATION_CHANGE: i32 = 4 * REPUTATION_UNIT; + +/// The reputation change to apply to a peer that we failed to connect to. +const FAILED_TO_CONNECT_REPUTATION_CHANGE: i32 = 24 * REPUTATION_UNIT; + +/// The reputation change to apply to a peer that failed to respond in time. +const TIMEOUT_REPUTATION_CHANGE: i32 = REPUTATION_UNIT; + +/// The reputation change to apply to a peer that sent a bad message. +const BAD_MESSAGE_REPUTATION_CHANGE: i32 = 8 * REPUTATION_UNIT; + +/// The reputation change to apply to a peer which violates protocol rules: minimal reputation +const BAD_PROTOCOL_REPUTATION_CHANGE: i32 = i32::MIN; + +/// Various kinds of reputation changes. +#[derive(Debug, Copy, Clone)] +pub enum ReputationChangeKind { + /// Received an unspecific bad message from the peer + BadMessage, + /// Peer sent a bad block. + /// + /// Note: this will we only used in pre-merge, pow consensus, since after no more block announcements are sent via devp2p: [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p) + BadBlock, + /// Peer sent a bad transaction messages. E.g. Transactions which weren't recoverable. + BadTransactions, + /// Peer failed to respond in time. + Timeout, + /// Peer does not adhere to network protocol rules. + BadProtocol, + /// Failed to establish a connection to the peer. + FailedToConnect, + /// Connection dropped by peer. + Dropped, + /// Apply a reputation change by value + Other(Reputation), +} + +/// How the [`ReputationChangeKind`] are weighted. +#[derive(Debug, Clone)] +pub struct ReputationChangeWeights { + /// Weight for [`ReputationChangeKind::BadMessage`] + pub bad_message: Reputation, + /// Weight for [`ReputationChangeKind::BadBlock`] + pub bad_block: Reputation, + /// Weight for [`ReputationChangeKind::BadTransactions`] + pub bad_transactions: Reputation, + /// Weight for [`ReputationChangeKind::Timeout`] + pub timeout: Reputation, + /// Weight for [`ReputationChangeKind::BadProtocol`] + pub bad_protocol: Reputation, + /// Weight for [`ReputationChangeKind::FailedToConnect`] + pub failed_to_connect: Reputation, + /// Weight for [`ReputationChangeKind::Dropped`] + pub dropped: Reputation, +} + +// === impl ReputationChangeWeights === + +impl ReputationChangeWeights { + /// Returns the quantifiable [`ReputationChange`] for the given [`ReputationChangeKind`] using + /// the configured weights + pub(crate) fn change(&self, kind: ReputationChangeKind) -> ReputationChange { + match kind { + ReputationChangeKind::BadMessage => self.bad_message.into(), + ReputationChangeKind::BadBlock => self.bad_block.into(), + ReputationChangeKind::BadTransactions => self.bad_transactions.into(), + ReputationChangeKind::Timeout => self.timeout.into(), + ReputationChangeKind::BadProtocol => self.bad_protocol.into(), + ReputationChangeKind::FailedToConnect => self.failed_to_connect.into(), + ReputationChangeKind::Dropped => self.dropped.into(), + ReputationChangeKind::Other(val) => val.into(), + } + } +} + +impl Default for ReputationChangeWeights { + fn default() -> Self { + Self { + bad_block: BAD_MESSAGE_REPUTATION_CHANGE, + bad_transactions: BAD_MESSAGE_REPUTATION_CHANGE, + bad_message: BAD_MESSAGE_REPUTATION_CHANGE, + timeout: TIMEOUT_REPUTATION_CHANGE, + bad_protocol: BAD_PROTOCOL_REPUTATION_CHANGE, + failed_to_connect: FAILED_TO_CONNECT_REPUTATION_CHANGE, + dropped: REMOTE_DISCONNECT_REPUTATION_CHANGE, + } + } +} + +/// Represents a change in a peer's reputation. +#[derive(Debug, Copy, Clone, Default)] +pub(crate) struct ReputationChange(Reputation); + +// === impl ReputationChange === + +impl ReputationChange { + /// Apply no reputation change. + pub(crate) const fn none() -> Self { + Self(0) + } + + /// Reputation change for a peer that dropped the connection. + pub(crate) const fn dropped() -> Self { + Self(REMOTE_DISCONNECT_REPUTATION_CHANGE) + } + + /// Helper type for easier conversion + #[inline] + pub(crate) fn as_i32(self) -> Reputation { + self.0 + } +} + +impl From for Reputation { + fn from(value: ReputationChange) -> Self { + value.0 + } +} + +impl From for ReputationChange { + fn from(value: Reputation) -> Self { + ReputationChange(value) + } +} diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index 810fc4a703..53f4eea2d2 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -110,17 +110,18 @@ impl ActiveSession { /// Processes a response received from the peer macro_rules! on_response { - ($resp:ident, $item:ident) => { + ($this:ident, $resp:ident, $item:ident) => { let RequestPair { request_id, message } = $resp; #[allow(clippy::collapsible_match)] if let Some(resp) = self.inflight_requests.remove(&request_id) { if let PeerRequest::$item { response, .. } = resp { let _ = response.send(Ok(message)); } else { - // TODO handle bad response + resp.send_bad_response(); + $this.on_bad_message(); } } else { - // TODO handle unexpected response + $this.on_bad_message() } }; } @@ -147,31 +148,31 @@ impl ActiveSession { on_request!(req, BlockHeaders, GetBlockHeaders); } EthMessage::BlockHeaders(resp) => { - on_response!(resp, GetBlockHeaders); + on_response!(self, resp, GetBlockHeaders); } EthMessage::GetBlockBodies(req) => { on_request!(req, BlockBodies, GetBlockBodies); } EthMessage::BlockBodies(resp) => { - on_response!(resp, GetBlockBodies); + on_response!(self, resp, GetBlockBodies); } EthMessage::GetPooledTransactions(req) => { on_request!(req, PooledTransactions, GetPooledTransactions); } EthMessage::PooledTransactions(resp) => { - on_response!(resp, GetPooledTransactions); + on_response!(self, resp, GetPooledTransactions); } EthMessage::GetNodeData(req) => { on_request!(req, NodeData, GetNodeData); } EthMessage::NodeData(resp) => { - on_response!(resp, GetNodeData); + on_response!(self, resp, GetNodeData); } EthMessage::GetReceipts(req) => { on_request!(req, Receipts, GetReceipts); } EthMessage::Receipts(resp) => { - on_response!(resp, GetReceipts); + on_response!(self, resp, GetReceipts); } }; @@ -241,6 +242,13 @@ impl ActiveSession { .try_send(ActiveSessionMessage::ValidMessage { peer_id: self.remote_peer_id, message }) } + /// Notify the manager that the peer sent a bad message + fn on_bad_message(&self) { + let _ = self + .to_session + .try_send(ActiveSessionMessage::BadMessage { peer_id: self.remote_peer_id }); + } + /// Report back that this session has been closed. fn emit_disconnect(&self) { // NOTE: we clone here so there's enough capacity to deliver this message @@ -514,8 +522,8 @@ mod tests { remote_addr, peer_id, capabilities, - status: _, conn, + .. } => { let (_to_session_tx, messages_rx) = mpsc::channel(10); let (commands_to_session, commands_rx) = mpsc::channel(10); diff --git a/crates/net/network/src/session/handle.rs b/crates/net/network/src/session/handle.rs index f83eafbdbf..72bfc02b75 100644 --- a/crates/net/network/src/session/handle.rs +++ b/crates/net/network/src/session/handle.rs @@ -72,6 +72,7 @@ pub(crate) enum PendingSessionEvent { capabilities: Arc, status: Status, conn: EthStream>>, + direction: Direction, }, /// Handshake unsuccessful, session was disconnected. Disconnected { @@ -88,7 +89,12 @@ pub(crate) enum PendingSessionEvent { error: io::Error, }, /// Thrown when authentication via Ecies failed. - EciesAuthError { remote_addr: SocketAddr, session_id: SessionId, error: ECIESError }, + EciesAuthError { + remote_addr: SocketAddr, + session_id: SessionId, + error: ECIESError, + direction: Direction, + }, } /// Commands that can be sent to the spawned session. @@ -132,4 +138,9 @@ pub(crate) enum ActiveSessionMessage { /// Message received from the peer. message: CapabilityMessage, }, + /// Received a bad message from the peer. + BadMessage { + /// Identifier of the remote peer. + peer_id: PeerId, + }, } diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index 55f1e561d4..a0030df798 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -246,6 +246,9 @@ impl SessionManager { ActiveSessionMessage::InvalidMessage { peer_id, capabilities, message } => { Poll::Ready(SessionEvent::InvalidMessage { peer_id, message, capabilities }) } + ActiveSessionMessage::BadMessage { peer_id } => { + Poll::Ready(SessionEvent::BadMessage { peer_id }) + } } } } @@ -273,6 +276,7 @@ impl SessionManager { capabilities, conn, status, + direction, } => { // move from pending to established. let _ = self.pending_sessions.remove(&session_id); @@ -317,6 +321,7 @@ impl SessionManager { capabilities, status, messages, + direction, }) } PendingSessionEvent::Disconnected { remote_addr, session_id, direction, error } => { @@ -358,12 +363,18 @@ impl SessionManager { "connection refused" ); let _ = self.pending_sessions.remove(&session_id); - return Poll::Ready(SessionEvent::IncomingPendingSessionClosed { + return Poll::Ready(SessionEvent::OutgoingPendingSessionClosed { remote_addr, + peer_id, error: None, }) } - PendingSessionEvent::EciesAuthError { remote_addr, session_id, error } => { + PendingSessionEvent::EciesAuthError { + remote_addr, + session_id, + error, + direction, + } => { let _ = self.pending_sessions.remove(&session_id); warn!( target : "net::session", @@ -373,10 +384,21 @@ impl SessionManager { "ecies auth failed" ); let _ = self.pending_sessions.remove(&session_id); - return Poll::Ready(SessionEvent::IncomingPendingSessionClosed { - remote_addr, - error: None, - }) + return match direction { + Direction::Incoming => { + Poll::Ready(SessionEvent::IncomingPendingSessionClosed { + remote_addr, + error: None, + }) + } + Direction::Outgoing(peer_id) => { + Poll::Ready(SessionEvent::OutgoingPendingSessionClosed { + remote_addr, + peer_id, + error: None, + }) + } + } } } } @@ -431,6 +453,7 @@ pub(crate) enum SessionEvent { capabilities: Arc, status: Status, messages: PeerRequestSender, + direction: Direction, }, /// A session received a valid message via RLPx. ValidMessage { @@ -446,6 +469,11 @@ pub(crate) enum SessionEvent { /// Message received from the peer. message: CapabilityMessage, }, + /// Received a bad message from the peer. + BadMessage { + /// Identifier of the remote peer. + peer_id: PeerId, + }, /// Closed an incoming pending session during authentication. IncomingPendingSessionClosed { remote_addr: SocketAddr, error: Option }, /// Closed an outgoing pending session during authentication. @@ -555,6 +583,13 @@ pub(crate) enum Direction { Outgoing(PeerId), } +impl Direction { + /// Returns `true` if this an incoming connection. + pub(crate) fn is_incoming(&self) -> bool { + matches!(self, Direction::Incoming) + } +} + async fn authenticate( disconnect_rx: oneshot::Receiver<()>, events: mpsc::Sender, @@ -572,7 +607,12 @@ async fn authenticate( Ok(stream) => stream, Err(error) => { let _ = events - .send(PendingSessionEvent::EciesAuthError { remote_addr, session_id, error }) + .send(PendingSessionEvent::EciesAuthError { + remote_addr, + session_id, + error, + direction, + }) .await; return } @@ -586,6 +626,7 @@ async fn authenticate( remote_addr, session_id, error, + direction, }) .await; return @@ -669,5 +710,6 @@ async fn authenticate_stream( capabilities: Arc::new(Capabilities::from(their_hello.capabilities)), status: their_status, conn: eth_stream, + direction, } } diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index ce317de360..ee52d44833 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -80,6 +80,11 @@ where } } + /// Returns mutable access to the [`PeersManager`] + pub(crate) fn peers_mut(&mut self) -> &mut PeersManager { + &mut self.peers_manager + } + /// How many peers we're currently connected to. pub fn num_connected_peers(&self) -> usize { self.connected_peers.len() diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs index 401ff16792..f5c123a1e8 100644 --- a/crates/net/network/src/swarm.rs +++ b/crates/net/network/src/swarm.rs @@ -1,7 +1,7 @@ use crate::{ listener::{ConnectionListener, ListenerEvent}, message::{PeerMessage, PeerRequestSender}, - session::{SessionEvent, SessionId, SessionManager}, + session::{Direction, SessionEvent, SessionId, SessionManager}, state::{AddSessionError, NetworkState, StateAction}, }; use futures::Stream; @@ -29,7 +29,7 @@ use tracing::warn; /// [`SessionsManager`]. Outgoing connections are either initiated on demand or triggered by the /// [`NetworkState`] and also delegated to the [`NetworkState`]. #[must_use = "Swarm does nothing unless polled"] -pub struct Swarm { +pub(crate) struct Swarm { /// Listens for new incoming connections. incoming: ConnectionListener, /// All sessions. @@ -86,6 +86,7 @@ where capabilities, status, messages, + direction, } => match self.state.on_session_activated( peer_id, capabilities.clone(), @@ -97,13 +98,15 @@ where remote_addr, capabilities, messages, + direction, }), Err(err) => { match err { AddSessionError::AtCapacity { peer } => { - self.sessions.disconnect(peer, Some(DisconnectReason::TooManyPeers)) + self.sessions.disconnect(peer, Some(DisconnectReason::TooManyPeers)); } }; + self.state.peers_mut().on_disconnected(&peer_id); None } }, @@ -125,14 +128,12 @@ where } SessionEvent::SessionClosedOnConnectionError { peer_id, remote_addr, error } => { self.state.on_session_closed(peer_id); - - // TODO(mattsse): reputation change on error - Some(SwarmEvent::SessionClosed { peer_id, remote_addr, error: Some(error) }) } SessionEvent::OutgoingConnectionError { remote_addr, peer_id, error } => { Some(SwarmEvent::OutgoingConnectionError { peer_id, remote_addr, error }) } + SessionEvent::BadMessage { peer_id } => Some(SwarmEvent::BadMessage { peer_id }), } } @@ -232,7 +233,7 @@ where /// All events created or delegated by the [`Swarm`] that represents changes to the state of the /// network. -pub enum SwarmEvent { +pub(crate) enum SwarmEvent { /// Events related to the actual network protocol. ValidMessage { /// The peer that sent the message @@ -248,6 +249,11 @@ pub enum SwarmEvent { /// Message received from the peer. message: CapabilityMessage, }, + /// Received a bad message from the peer. + BadMessage { + /// Identifier of the remote peer. + peer_id: PeerId, + }, /// The underlying tcp listener closed. TcpListenerClosed { /// Address of the closed listener. @@ -275,6 +281,7 @@ pub enum SwarmEvent { remote_addr: SocketAddr, capabilities: Arc, messages: PeerRequestSender, + direction: Direction, }, SessionClosed { peer_id: PeerId, diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index 3ddcd51ef0..3a0e2a7f94 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -5,6 +5,7 @@ use crate::{ manager::NetworkEvent, message::{PeerRequest, PeerRequestSender}, network::NetworkHandleMessage, + peers::ReputationChangeKind, NetworkHandle, }; use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; @@ -230,13 +231,14 @@ where /// Starts the import process for the given transactions. fn import_transactions(&mut self, peer_id: PeerId, transactions: Vec) { + let mut has_bad_transactions = false; if let Some(peer) = self.peers.get_mut(&peer_id) { for tx in transactions { // recover transaction let tx = if let Some(tx) = tx.into_ecrecovered() { tx } else { - // TODO: report peer? + has_bad_transactions = true; continue }; @@ -263,17 +265,25 @@ where } } } - } - fn on_good_import(&mut self, hash: TxHash) { - if let Some(_peers) = self.transactions_by_peers.remove(&hash) { - // TODO report good peer? + if has_bad_transactions { + self.report_bad_message(peer_id); } } + fn report_bad_message(&self, peer_id: PeerId) { + self.network.reputation_change(peer_id, ReputationChangeKind::BadTransactions); + } + + fn on_good_import(&mut self, hash: TxHash) { + self.transactions_by_peers.remove(&hash); + } + fn on_bad_import(&mut self, hash: TxHash) { - if let Some(_peers) = self.transactions_by_peers.remove(&hash) { - // TODO report bad peer? + if let Some(peers) = self.transactions_by_peers.remove(&hash) { + for peer_id in peers { + self.report_bad_message(peer_id); + } } } } @@ -329,10 +339,10 @@ where this.import_transactions(req.peer_id, txs.0); } Poll::Ready(Ok(Err(_))) => { - // TODO report bad peer + this.report_bad_message(req.peer_id); } Poll::Ready(Err(_)) => { - // TODO report bad peer + this.report_bad_message(req.peer_id); } } } diff --git a/crates/net/network/tests/it/connect.rs b/crates/net/network/tests/it/connect.rs index bb63e3c84c..db9e84a84d 100644 --- a/crates/net/network/tests/it/connect.rs +++ b/crates/net/network/tests/it/connect.rs @@ -42,7 +42,7 @@ async fn test_establish_connections() { let net = handle.terminate().await; - net.for_each(|peer| { - assert!(peer.num_peers() >= 1); - }); + assert_eq!(net.peers()[0].num_peers(), 2); + assert_eq!(net.peers()[1].num_peers(), 1); + assert_eq!(net.peers()[2].num_peers(), 1); }