refactor(net): unify dropped connection handling (#595)

This commit is contained in:
Matthias Seitz
2022-12-23 21:46:44 +01:00
committed by GitHub
parent 284391c181
commit 4e677b5993
6 changed files with 235 additions and 133 deletions

1
Cargo.lock generated
View File

@@ -3585,6 +3585,7 @@ version = "0.1.0"
dependencies = [
"aquamarine",
"async-trait",
"auto_impl",
"bytes",
"enr 0.7.0",
"ethers-core",

View File

@@ -36,6 +36,7 @@ tokio = { version = "1", features = ["io-util", "net", "macros", "rt-multi-threa
tokio-stream = "0.1"
# misc
auto_impl = "1"
aquamarine = "0.1" # docs
tracing = "0.1"
fnv = "1.0"

View File

@@ -1,5 +1,6 @@
//! Possible errors when interacting with the network.
use crate::session::PendingSessionHandshakeError;
use reth_eth_wire::{
error::{EthStreamError, HandshakeError, P2PHandshakeError, P2PStreamError},
DisconnectReason,
@@ -16,47 +17,90 @@ pub enum NetworkError {
Discovery(std::io::Error),
}
/// Returns true if the error indicates that the corresponding peer should be removed from peer
/// discovery, for example if it's using a different genesis hash.
pub(crate) fn error_merits_discovery_ban(err: &EthStreamError) -> bool {
match err {
EthStreamError::P2PStreamError(P2PStreamError::HandshakeError(
P2PHandshakeError::HelloNotInHandshake,
)) |
EthStreamError::P2PStreamError(P2PStreamError::HandshakeError(
P2PHandshakeError::NonHelloMessageInHandshake,
)) => true,
EthStreamError::HandshakeError(err) => !matches!(err, HandshakeError::NoResponse),
_ => false,
/// Abstraction over errors that can lead to a failed session
#[auto_impl::auto_impl(&)]
pub(crate) trait SessionError {
/// Returns true if the error indicates that the corresponding peer should be removed from peer
/// discovery, for example if it's using a different genesis hash.
fn merits_discovery_ban(&self) -> bool;
/// Returns true if the error indicates that we'll never be able to establish a connection to
/// that peer. For example, not matching capabilities or a mismatch in protocols.
///
/// Note: This does not necessarily mean that either of the peers are in violation of the
/// protocol but rather that they'll never be able to connect with each other. This check is
/// a superset of [`error_merits_discovery_ban`] which checks if the peer should not be part
/// of the gossip network.
fn is_fatal_protocol_error(&self) -> bool;
/// Returns true if the error should lead to backoff, temporarily preventing additional
/// connection attempts
fn should_backoff(&self) -> bool;
}
impl SessionError for EthStreamError {
fn merits_discovery_ban(&self) -> bool {
match self {
EthStreamError::P2PStreamError(P2PStreamError::HandshakeError(
P2PHandshakeError::HelloNotInHandshake,
)) |
EthStreamError::P2PStreamError(P2PStreamError::HandshakeError(
P2PHandshakeError::NonHelloMessageInHandshake,
)) => true,
EthStreamError::HandshakeError(err) => !matches!(err, HandshakeError::NoResponse),
_ => false,
}
}
fn is_fatal_protocol_error(&self) -> bool {
match self {
EthStreamError::P2PStreamError(err) => {
matches!(
err,
P2PStreamError::HandshakeError(P2PHandshakeError::NoSharedCapabilities) |
P2PStreamError::HandshakeError(P2PHandshakeError::HelloNotInHandshake) |
P2PStreamError::HandshakeError(
P2PHandshakeError::NonHelloMessageInHandshake
) |
P2PStreamError::UnknownReservedMessageId(_) |
P2PStreamError::EmptyProtocolMessage |
P2PStreamError::ParseVersionError(_) |
P2PStreamError::Disconnected(DisconnectReason::UselessPeer) |
P2PStreamError::Disconnected(
DisconnectReason::IncompatibleP2PProtocolVersion
) |
P2PStreamError::MismatchedProtocolVersion { .. }
)
}
EthStreamError::HandshakeError(err) => !matches!(err, HandshakeError::NoResponse),
_ => false,
}
}
fn should_backoff(&self) -> bool {
Some(DisconnectReason::TooManyPeers) == self.as_disconnected()
}
}
/// Returns true if the error indicates that we'll never be able to establish a connection to that
/// peer. For example, not matching capabilities or a mismatch in protocols.
///
/// Note: This does not necessarily mean that either of the peers are in violation of the protocol
/// but rather that they'll never be able to connect with each other. This check is a superset of
/// [`error_merits_discovery_ban`] which checks if the peer should not be part of the gossip
/// network.
pub(crate) fn is_fatal_protocol_error(err: &EthStreamError) -> bool {
match err {
EthStreamError::P2PStreamError(err) => {
matches!(
err,
P2PStreamError::HandshakeError(P2PHandshakeError::NoSharedCapabilities) |
P2PStreamError::HandshakeError(P2PHandshakeError::HelloNotInHandshake) |
P2PStreamError::HandshakeError(P2PHandshakeError::NonHelloMessageInHandshake) |
P2PStreamError::UnknownReservedMessageId(_) |
P2PStreamError::EmptyProtocolMessage |
P2PStreamError::ParseVersionError(_) |
P2PStreamError::Disconnected(DisconnectReason::UselessPeer) |
P2PStreamError::Disconnected(
DisconnectReason::IncompatibleP2PProtocolVersion
) |
P2PStreamError::MismatchedProtocolVersion { .. }
)
impl SessionError for PendingSessionHandshakeError {
fn merits_discovery_ban(&self) -> bool {
match self {
PendingSessionHandshakeError::Eth(eth) => eth.merits_discovery_ban(),
PendingSessionHandshakeError::Ecies(_) => true,
}
}
fn is_fatal_protocol_error(&self) -> bool {
match self {
PendingSessionHandshakeError::Eth(eth) => eth.is_fatal_protocol_error(),
PendingSessionHandshakeError::Ecies(_) => true,
}
}
fn should_backoff(&self) -> bool {
match self {
PendingSessionHandshakeError::Eth(eth) => eth.should_backoff(),
PendingSessionHandshakeError::Ecies(_) => true,
}
EthStreamError::HandshakeError(err) => !matches!(err, HandshakeError::NoResponse),
_ => false,
}
}

View File

@@ -18,7 +18,7 @@
use crate::{
config::NetworkConfig,
discovery::Discovery,
error::NetworkError,
error::{NetworkError, SessionError},
eth_requests::IncomingEthRequest,
import::{BlockImport, BlockImportOutcome, BlockValidation},
listener::ConnectionListener,
@@ -575,7 +575,7 @@ where
let mut reason = None;
if let Some(ref err) = error {
// If the connection was closed due to an error, we report the peer
this.swarm.state_mut().peers_mut().on_connection_dropped(
this.swarm.state_mut().peers_mut().on_active_session_dropped(
&remote_addr,
&peer_id,
err,
@@ -583,7 +583,10 @@ where
reason = err.as_disconnected();
} else {
// Gracefully disconnected
this.swarm.state_mut().peers_mut().on_disconnected(peer_id);
this.swarm
.state_mut()
.peers_mut()
.on_active_session_gracefully_closed(peer_id);
}
this.event_listeners.send(NetworkEvent::SessionClosed { peer_id, reason });
@@ -609,12 +612,18 @@ where
?error,
"Outgoing pending session failed"
);
let peers = this.swarm.state_mut().peers_mut();
peers.on_closed_outgoing_pending_session(&peer_id);
peers.apply_reputation_change(&peer_id, ReputationChangeKind::FailedToConnect);
if error.map(|err| err.merits_discovery_ban()).unwrap_or_default() {
this.swarm.state_mut().ban_discovery(peer_id, remote_addr.ip());
if let Some(ref err) = error {
this.swarm.state_mut().peers_mut().on_pending_session_dropped(
&remote_addr,
&peer_id,
err,
);
} else {
this.swarm
.state_mut()
.peers_mut()
.on_pending_session_gracefully_closed(&peer_id);
}
}
SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {

View File

@@ -1,10 +1,10 @@
use crate::{
error::{error_merits_discovery_ban, is_fatal_protocol_error},
error::SessionError,
peers::{
reputation::{is_banned_reputation, DEFAULT_REPUTATION},
reputation::{is_banned_reputation, BACKOFF_REPUTATION_CHANGE, DEFAULT_REPUTATION},
ReputationChangeKind, ReputationChangeWeights,
},
session::Direction,
session::{Direction, PendingSessionHandshakeError},
};
use futures::StreamExt;
use reth_eth_wire::{error::EthStreamError, DisconnectReason};
@@ -17,8 +17,6 @@ use std::{
task::{Context, Poll},
time::Duration,
};
use crate::peers::reputation::BACKOFF_REPUTATION_CHANGE;
use thiserror::Error;
use tokio::{
sync::{mpsc, oneshot},
@@ -162,14 +160,6 @@ impl PeersManager {
self.connection_info.decr_in()
}
/// Invoked when a pending outgoing session was closed.
pub(crate) fn on_closed_outgoing_pending_session(&mut self, peer_id: &PeerId) {
if let Some(mut peer) = self.peers.get_mut(peer_id) {
peer.state = PeerConnectionState::Idle;
}
self.connection_info.decr_out()
}
/// Called when a new _incoming_ active session was established to the given peer.
///
/// This will update the state of the peer if not yet tracked.
@@ -241,8 +231,28 @@ impl PeersManager {
}
}
/// Gracefully disconnected
pub(crate) fn on_disconnected(&mut self, peer_id: PeerId) {
/// Gracefully disconnected a pending session
pub(crate) fn on_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) {
if let Some(mut peer) = self.peers.get_mut(peer_id) {
peer.state = PeerConnectionState::Idle;
} else {
return
}
self.connection_info.decr_out()
}
/// Invoked when a pending outgoing session was closed during authentication or the handshake.
pub(crate) fn on_pending_session_dropped(
&mut self,
remote_addr: &SocketAddr,
peer_id: &PeerId,
err: &PendingSessionHandshakeError,
) {
self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
}
/// Gracefully disconnected an active session
pub(crate) fn on_active_session_gracefully_closed(&mut self, peer_id: PeerId) {
match self.peers.entry(peer_id) {
Entry::Occupied(mut entry) => {
self.connection_info.decr_state(entry.get().state);
@@ -261,6 +271,63 @@ impl PeersManager {
self.fill_outbound_slots();
}
/// Called when an _active_ session to a peer was forcefully dropped due to an error.
///
/// Depending on whether the error is fatal, the peer will be removed from the peer set
/// otherwise its reputation is slashed.
pub(crate) fn on_active_session_dropped(
&mut self,
remote_addr: &SocketAddr,
peer_id: &PeerId,
err: &EthStreamError,
) {
self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::Dropped)
}
fn on_connection_failure(
&mut self,
remote_addr: &SocketAddr,
peer_id: &PeerId,
err: impl SessionError,
reputation_change: ReputationChangeKind,
) {
if err.is_fatal_protocol_error() {
// remove the peer to which we can't establish a connection due to protocol related
// issues.
if let Some(peer) = self.peers.remove(peer_id) {
self.connection_info.decr_state(peer.state);
}
// ban the peer
self.ban_peer(*peer_id);
// If the error is caused by a peer that should be banned from discovery
if err.merits_discovery_ban() {
self.queued_actions.push_back(PeerAction::DiscoveryBan {
peer_id: *peer_id,
ip_addr: remote_addr.ip(),
})
}
} else {
let reputation_change = if err.should_backoff() {
// The peer has signaled that it is currently unable to process any more
// connections, so we will hold off on attempting any new connections for a while
self.backoff_peer(*peer_id);
BACKOFF_REPUTATION_CHANGE.into()
} else {
self.reputation_weights.change(reputation_change)
};
if let Some(mut peer) = self.peers.get_mut(peer_id) {
self.connection_info.decr_state(peer.state);
peer.state = PeerConnectionState::Idle;
peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32());
}
}
self.fill_outbound_slots();
}
/// Invoked if a session was disconnected because there's already a connection to the peer.
///
/// If the session was an outgoing connection, this means that the peer initiated a connection
@@ -275,55 +342,6 @@ impl PeersManager {
}
}
/// Called when a session to a peer was forcefully disconnected.
///
/// Depending on whether the error is fatal, the peer will be removed from the peer set
/// otherwise its reputation is slashed.
pub(crate) fn on_connection_dropped(
&mut self,
remote_addr: &SocketAddr,
peer_id: &PeerId,
err: &EthStreamError,
) {
if is_fatal_protocol_error(err) {
// remove the peer to which we can't establish a connection due to protocol related
// issues.
if let Some(peer) = self.peers.remove(peer_id) {
self.connection_info.decr_state(peer.state);
}
// ban the peer
self.ban_peer(*peer_id);
// If the error is caused by a peer that should be banned from discovery
if error_merits_discovery_ban(err) {
self.queued_actions.push_back(PeerAction::DiscoveryBan {
peer_id: *peer_id,
ip_addr: remote_addr.ip(),
})
}
} else {
let reputation_change = if let Some(DisconnectReason::TooManyPeers) =
err.as_disconnected()
{
// The peer has signaled that it is currently unable to process any more
// connections, so we will hold off on attempting any new connections for a while
self.backoff_peer(*peer_id);
BACKOFF_REPUTATION_CHANGE.into()
} else {
self.reputation_weights.change(ReputationChangeKind::Dropped)
};
if let Some(mut peer) = self.peers.get_mut(peer_id) {
self.connection_info.decr_state(peer.state);
peer.state = PeerConnectionState::Idle;
peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32());
}
}
self.fill_outbound_slots();
}
/// Called as follow-up for a discovered peer.
///
/// The [`ForkId`] is retrieved from an ENR record that the peer announces over the discovery
@@ -803,6 +821,7 @@ mod test {
manager::{ConnectionInfo, PeerConnectionState},
PeerAction, ReputationChangeKind,
},
session::PendingSessionHandshakeError,
PeersConfig,
};
use reth_eth_wire::{
@@ -899,7 +918,7 @@ mod test {
})
.await;
peers.on_connection_dropped(
peers.on_active_session_dropped(
&socket_addr,
&peer,
&EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
@@ -934,7 +953,7 @@ mod test {
}
#[tokio::test]
async fn test_ban_on_drop() {
async fn test_ban_on_active_drop() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
@@ -953,7 +972,7 @@ mod test {
})
.await;
peers.on_connection_dropped(
peers.on_active_session_dropped(
&socket_addr,
&peer,
&EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
@@ -977,6 +996,50 @@ mod test {
assert!(peers.peers.get(&peer).is_none());
}
#[tokio::test]
async fn test_ban_on_pending_drop() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_discovered_node(peer, socket_addr);
match event!(peers) {
PeerAction::Connect { peer_id, .. } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
poll_fn(|cx| {
assert!(peers.poll(cx).is_pending());
Poll::Ready(())
})
.await;
peers.on_pending_session_dropped(
&socket_addr,
&peer,
&PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
P2PStreamError::Disconnected(DisconnectReason::UselessPeer),
)),
);
match event!(peers) {
PeerAction::BanPeer { peer_id } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
poll_fn(|cx| {
assert!(peers.poll(cx).is_pending());
Poll::Ready(())
})
.await;
assert!(peers.peers.get(&peer).is_none());
}
#[tokio::test]
async fn test_reputation_change() {
let peer = PeerId::random();
@@ -1038,7 +1101,7 @@ mod test {
assert_eq!(p.state, PeerConnectionState::DisconnectingOut);
assert!(p.is_banned());
peers.on_disconnected(peer);
peers.on_active_session_gracefully_closed(peer);
let p = peers.peers.get(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::Idle);
@@ -1086,7 +1149,7 @@ mod test {
let p = peers.peers.get(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::DisconnectingOut);
peers.on_disconnected(peer);
peers.on_active_session_gracefully_closed(peer);
assert!(peers.peers.get(&peer).is_none());
}

View File

@@ -13,13 +13,14 @@ use crate::{
};
use fnv::FnvHashMap;
use futures::{future::Either, io, FutureExt, StreamExt};
use reth_ecies::stream::ECIESStream;
use reth_ecies::{stream::ECIESStream, ECIESError};
use reth_eth_wire::{
capability::{Capabilities, CapabilityMessage},
error::EthStreamError,
DisconnectReason, HelloMessage, Status, UnauthedEthStream, UnauthedP2PStream,
};
use reth_primitives::{ForkFilter, ForkId, ForkTransition, PeerId, H256, U256};
use reth_tasks::TaskExecutor;
use secp256k1::SecretKey;
use std::{
collections::HashMap,
@@ -40,10 +41,6 @@ mod active;
mod config;
mod handle;
pub use config::SessionsConfig;
use reth_ecies::ECIESError;
use crate::error::error_merits_discovery_ban;
use reth_tasks::TaskExecutor;
/// Internal identifier for active sessions.
#[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)]
@@ -550,19 +547,6 @@ pub(crate) enum PendingSessionHandshakeError {
Ecies(ECIESError),
}
// === impl PendingSessionHandshakeError ===
impl PendingSessionHandshakeError {
/// Returns true if the error indicates that the corresponding peer should be removed from peer
/// discover
pub(crate) fn merits_discovery_ban(&self) -> bool {
match self {
PendingSessionHandshakeError::Eth(eth) => error_merits_discovery_ban(eth),
PendingSessionHandshakeError::Ecies(_) => true,
}
}
}
/// The direction of the connection.
#[derive(Debug, Copy, Clone)]
pub(crate) enum Direction {