diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index f21121a224..76c85f95c6 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -505,9 +505,9 @@ where "Session disconnected" ); - if error.is_some() { + if let Some(ref err) = error { // If the connection was closed due to an error, we report the peer - this.swarm.state_mut().peers_mut().on_connection_dropped(&peer_id); + this.swarm.state_mut().peers_mut().on_connection_dropped(&peer_id, err); } else { // Gracefully disconnected this.swarm.state_mut().peers_mut().on_disconnected(&peer_id); diff --git a/crates/net/network/src/message.rs b/crates/net/network/src/message.rs index 441ef6c526..d6cfddef55 100644 --- a/crates/net/network/src/message.rs +++ b/crates/net/network/src/message.rs @@ -13,6 +13,7 @@ use reth_eth_wire::{ use reth_interfaces::p2p::error::{RequestError, RequestResult}; use reth_primitives::{Header, PeerId, Receipt, TransactionSigned, H256}; use std::{ + fmt, sync::Arc, task::{ready, Context, Poll}, }; @@ -253,7 +254,7 @@ impl PeerResponseResult { } /// A Cloneable connection for sending _requests_ directly to the session of a peer. -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct PeerRequestSender { /// id of the remote node. pub(crate) peer_id: PeerId, @@ -274,3 +275,9 @@ impl PeerRequestSender { &self.peer_id } } + +impl fmt::Debug for PeerRequestSender { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PeerRequestSender").field("peer_id", &self.peer_id).finish_non_exhaustive() + } +} diff --git a/crates/net/network/src/peers/manager.rs b/crates/net/network/src/peers/manager.rs index ad1517afb7..2699ed3b7b 100644 --- a/crates/net/network/src/peers/manager.rs +++ b/crates/net/network/src/peers/manager.rs @@ -1,6 +1,9 @@ use crate::peers::{reputation::BANNED_REPUTATION, ReputationChangeKind, ReputationChangeWeights}; use futures::StreamExt; -use reth_eth_wire::DisconnectReason; +use reth_eth_wire::{ + error::{EthStreamError, HandshakeError, P2PHandshakeError, P2PStreamError}, + DisconnectReason, +}; use reth_primitives::PeerId; use std::{ collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, @@ -121,6 +124,7 @@ impl PeersManager { pub(crate) fn on_closed_incoming_pending_session(&mut self) { self.connection_info.decr_in() } + /// Invoked when a pending session was closed. pub(crate) fn on_closed_outgoing_pending_session(&mut self) { self.connection_info.decr_out() @@ -184,16 +188,29 @@ impl PeersManager { self.connection_info.decr_state(peer.state); peer.state = PeerConnectionState::Idle; } + + self.fill_outbound_slots(); } /// Called when a session to a peer was forcefully disconnected. - pub(crate) fn on_connection_dropped(&mut self, peer_id: &PeerId) { - let reputation_change = self.reputation_weights.change(ReputationChangeKind::Dropped); - if let Some(mut peer) = self.peers.get_mut(peer_id) { + /// + /// Depending on whether the error is fatal, the peer will be removed from the peer set + /// otherwise its reputation is slashed. + pub(crate) fn on_connection_dropped(&mut self, peer_id: &PeerId, err: &EthStreamError) { + if is_fatal_protocol_error(err) { + // remove the peer to which we can't establish a connection due to protocol related + // issues. + if let Some(peer) = self.peers.remove(peer_id) { + self.connection_info.decr_state(peer.state); + } + } else if let Some(mut peer) = self.peers.get_mut(peer_id) { self.connection_info.decr_state(peer.state); peer.state = PeerConnectionState::Idle; + let reputation_change = self.reputation_weights.change(ReputationChangeKind::Dropped); peer.reputation = peer.reputation.saturating_sub(reputation_change.as_i32()); } + + self.fill_outbound_slots(); } /// Called for a newly discovered peer. @@ -209,12 +226,15 @@ impl PeersManager { Entry::Occupied(mut entry) => { let node = entry.get_mut(); node.addr = addr; + return } Entry::Vacant(entry) => { trace!(target : "net::peers", ?peer_id, ?addr, "discovered new node"); entry.insert(Peer::new(addr)); } } + + self.fill_outbound_slots(); } /// Removes the tracked node from the set. @@ -574,6 +594,26 @@ impl Display for InboundConnectionError { } } +/// Returns true if the error indicates that we'll never be able to establish a connection to that +/// peer. For example, not matching capabilities or a mismatch in protocols. +fn is_fatal_protocol_error(err: &EthStreamError) -> bool { + match err { + EthStreamError::P2PStreamError(err) => { + matches!( + err, + P2PStreamError::HandshakeError(P2PHandshakeError::NoSharedCapabilities) | + P2PStreamError::UnknownReservedMessageId(_) | + P2PStreamError::EmptyProtocolMessage | + P2PStreamError::ParseVersionError(_) | + P2PStreamError::Disconnected(DisconnectReason::UselessPeer) | + P2PStreamError::MismatchedProtocolVersion { .. } + ) + } + EthStreamError::HandshakeError(err) => !matches!(err, HandshakeError::NoResponse), + _ => false, + } +} + #[cfg(test)] mod test { use std::{