fix(net): improve dropped connection handling (#412)

This commit is contained in:
Matthias Seitz
2022-12-13 18:26:48 +01:00
committed by GitHub
parent 95d99c4317
commit 8e5e7ff5b6
3 changed files with 54 additions and 7 deletions

View File

@@ -505,9 +505,9 @@ where
"Session disconnected"
);
if error.is_some() {
if let Some(ref err) = error {
// If the connection was closed due to an error, we report the peer
this.swarm.state_mut().peers_mut().on_connection_dropped(&peer_id);
this.swarm.state_mut().peers_mut().on_connection_dropped(&peer_id, err);
} else {
// Gracefully disconnected
this.swarm.state_mut().peers_mut().on_disconnected(&peer_id);

View File

@@ -13,6 +13,7 @@ use reth_eth_wire::{
use reth_interfaces::p2p::error::{RequestError, RequestResult};
use reth_primitives::{Header, PeerId, Receipt, TransactionSigned, H256};
use std::{
fmt,
sync::Arc,
task::{ready, Context, Poll},
};
@@ -253,7 +254,7 @@ impl PeerResponseResult {
}
/// A Cloneable connection for sending _requests_ directly to the session of a peer.
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct PeerRequestSender {
/// id of the remote node.
pub(crate) peer_id: PeerId,
@@ -274,3 +275,9 @@ impl PeerRequestSender {
&self.peer_id
}
}
impl fmt::Debug for PeerRequestSender {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PeerRequestSender").field("peer_id", &self.peer_id).finish_non_exhaustive()
}
}

View File

@@ -1,6 +1,9 @@
use crate::peers::{reputation::BANNED_REPUTATION, ReputationChangeKind, ReputationChangeWeights};
use futures::StreamExt;
use reth_eth_wire::DisconnectReason;
use reth_eth_wire::{
error::{EthStreamError, HandshakeError, P2PHandshakeError, P2PStreamError},
DisconnectReason,
};
use reth_primitives::PeerId;
use std::{
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
@@ -121,6 +124,7 @@ impl PeersManager {
pub(crate) fn on_closed_incoming_pending_session(&mut self) {
self.connection_info.decr_in()
}
/// Invoked when a pending session was closed.
pub(crate) fn on_closed_outgoing_pending_session(&mut self) {
self.connection_info.decr_out()
@@ -184,16 +188,29 @@ impl PeersManager {
self.connection_info.decr_state(peer.state);
peer.state = PeerConnectionState::Idle;
}
self.fill_outbound_slots();
}
/// Called when a session to a peer was forcefully disconnected.
pub(crate) fn on_connection_dropped(&mut self, peer_id: &PeerId) {
let reputation_change = self.reputation_weights.change(ReputationChangeKind::Dropped);
if let Some(mut peer) = self.peers.get_mut(peer_id) {
///
/// Depending on whether the error is fatal, the peer will be removed from the peer set
/// otherwise its reputation is slashed.
pub(crate) fn on_connection_dropped(&mut self, peer_id: &PeerId, err: &EthStreamError) {
if is_fatal_protocol_error(err) {
// remove the peer to which we can't establish a connection due to protocol related
// issues.
if let Some(peer) = self.peers.remove(peer_id) {
self.connection_info.decr_state(peer.state);
}
} else if let Some(mut peer) = self.peers.get_mut(peer_id) {
self.connection_info.decr_state(peer.state);
peer.state = PeerConnectionState::Idle;
let reputation_change = self.reputation_weights.change(ReputationChangeKind::Dropped);
peer.reputation = peer.reputation.saturating_sub(reputation_change.as_i32());
}
self.fill_outbound_slots();
}
/// Called for a newly discovered peer.
@@ -209,12 +226,15 @@ impl PeersManager {
Entry::Occupied(mut entry) => {
let node = entry.get_mut();
node.addr = addr;
return
}
Entry::Vacant(entry) => {
trace!(target : "net::peers", ?peer_id, ?addr, "discovered new node");
entry.insert(Peer::new(addr));
}
}
self.fill_outbound_slots();
}
/// Removes the tracked node from the set.
@@ -574,6 +594,26 @@ impl Display for InboundConnectionError {
}
}
/// Returns true if the error indicates that we'll never be able to establish a connection to that
/// peer. For example, not matching capabilities or a mismatch in protocols.
fn is_fatal_protocol_error(err: &EthStreamError) -> bool {
match err {
EthStreamError::P2PStreamError(err) => {
matches!(
err,
P2PStreamError::HandshakeError(P2PHandshakeError::NoSharedCapabilities) |
P2PStreamError::UnknownReservedMessageId(_) |
P2PStreamError::EmptyProtocolMessage |
P2PStreamError::ParseVersionError(_) |
P2PStreamError::Disconnected(DisconnectReason::UselessPeer) |
P2PStreamError::MismatchedProtocolVersion { .. }
)
}
EthStreamError::HandshakeError(err) => !matches!(err, HandshakeError::NoResponse),
_ => false,
}
}
#[cfg(test)]
mod test {
use std::{