diff --git a/Cargo.lock b/Cargo.lock index 0ce14ecdb7..d8fe496de3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3585,6 +3585,7 @@ version = "0.1.0" dependencies = [ "aquamarine", "async-trait", + "auto_impl", "bytes", "enr 0.7.0", "ethers-core", diff --git a/crates/net/network/Cargo.toml b/crates/net/network/Cargo.toml index 1022acc602..01b8a11923 100644 --- a/crates/net/network/Cargo.toml +++ b/crates/net/network/Cargo.toml @@ -36,6 +36,7 @@ tokio = { version = "1", features = ["io-util", "net", "macros", "rt-multi-threa tokio-stream = "0.1" # misc +auto_impl = "1" aquamarine = "0.1" # docs tracing = "0.1" fnv = "1.0" diff --git a/crates/net/network/src/error.rs b/crates/net/network/src/error.rs index dbe1a7e44c..c73f4de683 100644 --- a/crates/net/network/src/error.rs +++ b/crates/net/network/src/error.rs @@ -1,5 +1,6 @@ //! Possible errors when interacting with the network. +use crate::session::PendingSessionHandshakeError; use reth_eth_wire::{ error::{EthStreamError, HandshakeError, P2PHandshakeError, P2PStreamError}, DisconnectReason, @@ -16,47 +17,90 @@ pub enum NetworkError { Discovery(std::io::Error), } -/// Returns true if the error indicates that the corresponding peer should be removed from peer -/// discovery, for example if it's using a different genesis hash. -pub(crate) fn error_merits_discovery_ban(err: &EthStreamError) -> bool { - match err { - EthStreamError::P2PStreamError(P2PStreamError::HandshakeError( - P2PHandshakeError::HelloNotInHandshake, - )) | - EthStreamError::P2PStreamError(P2PStreamError::HandshakeError( - P2PHandshakeError::NonHelloMessageInHandshake, - )) => true, - EthStreamError::HandshakeError(err) => !matches!(err, HandshakeError::NoResponse), - _ => false, +/// Abstraction over errors that can lead to a failed session +#[auto_impl::auto_impl(&)] +pub(crate) trait SessionError { + /// Returns true if the error indicates that the corresponding peer should be removed from peer + /// discovery, for example if it's using a different genesis hash. + fn merits_discovery_ban(&self) -> bool; + + /// Returns true if the error indicates that we'll never be able to establish a connection to + /// that peer. For example, not matching capabilities or a mismatch in protocols. + /// + /// Note: This does not necessarily mean that either of the peers are in violation of the + /// protocol but rather that they'll never be able to connect with each other. This check is + /// a superset of [`error_merits_discovery_ban`] which checks if the peer should not be part + /// of the gossip network. + fn is_fatal_protocol_error(&self) -> bool; + + /// Returns true if the error should lead to backoff, temporarily preventing additional + /// connection attempts + fn should_backoff(&self) -> bool; +} + +impl SessionError for EthStreamError { + fn merits_discovery_ban(&self) -> bool { + match self { + EthStreamError::P2PStreamError(P2PStreamError::HandshakeError( + P2PHandshakeError::HelloNotInHandshake, + )) | + EthStreamError::P2PStreamError(P2PStreamError::HandshakeError( + P2PHandshakeError::NonHelloMessageInHandshake, + )) => true, + EthStreamError::HandshakeError(err) => !matches!(err, HandshakeError::NoResponse), + _ => false, + } + } + + fn is_fatal_protocol_error(&self) -> bool { + match self { + EthStreamError::P2PStreamError(err) => { + matches!( + err, + P2PStreamError::HandshakeError(P2PHandshakeError::NoSharedCapabilities) | + P2PStreamError::HandshakeError(P2PHandshakeError::HelloNotInHandshake) | + P2PStreamError::HandshakeError( + P2PHandshakeError::NonHelloMessageInHandshake + ) | + P2PStreamError::UnknownReservedMessageId(_) | + P2PStreamError::EmptyProtocolMessage | + P2PStreamError::ParseVersionError(_) | + P2PStreamError::Disconnected(DisconnectReason::UselessPeer) | + P2PStreamError::Disconnected( + DisconnectReason::IncompatibleP2PProtocolVersion + ) | + P2PStreamError::MismatchedProtocolVersion { .. } + ) + } + EthStreamError::HandshakeError(err) => !matches!(err, HandshakeError::NoResponse), + _ => false, + } + } + + fn should_backoff(&self) -> bool { + Some(DisconnectReason::TooManyPeers) == self.as_disconnected() } } -/// Returns true if the error indicates that we'll never be able to establish a connection to that -/// peer. For example, not matching capabilities or a mismatch in protocols. -/// -/// Note: This does not necessarily mean that either of the peers are in violation of the protocol -/// but rather that they'll never be able to connect with each other. This check is a superset of -/// [`error_merits_discovery_ban`] which checks if the peer should not be part of the gossip -/// network. -pub(crate) fn is_fatal_protocol_error(err: &EthStreamError) -> bool { - match err { - EthStreamError::P2PStreamError(err) => { - matches!( - err, - P2PStreamError::HandshakeError(P2PHandshakeError::NoSharedCapabilities) | - P2PStreamError::HandshakeError(P2PHandshakeError::HelloNotInHandshake) | - P2PStreamError::HandshakeError(P2PHandshakeError::NonHelloMessageInHandshake) | - P2PStreamError::UnknownReservedMessageId(_) | - P2PStreamError::EmptyProtocolMessage | - P2PStreamError::ParseVersionError(_) | - P2PStreamError::Disconnected(DisconnectReason::UselessPeer) | - P2PStreamError::Disconnected( - DisconnectReason::IncompatibleP2PProtocolVersion - ) | - P2PStreamError::MismatchedProtocolVersion { .. } - ) +impl SessionError for PendingSessionHandshakeError { + fn merits_discovery_ban(&self) -> bool { + match self { + PendingSessionHandshakeError::Eth(eth) => eth.merits_discovery_ban(), + PendingSessionHandshakeError::Ecies(_) => true, + } + } + + fn is_fatal_protocol_error(&self) -> bool { + match self { + PendingSessionHandshakeError::Eth(eth) => eth.is_fatal_protocol_error(), + PendingSessionHandshakeError::Ecies(_) => true, + } + } + + fn should_backoff(&self) -> bool { + match self { + PendingSessionHandshakeError::Eth(eth) => eth.should_backoff(), + PendingSessionHandshakeError::Ecies(_) => true, } - EthStreamError::HandshakeError(err) => !matches!(err, HandshakeError::NoResponse), - _ => false, } } diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 2d23962c73..932c1063f4 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -18,7 +18,7 @@ use crate::{ config::NetworkConfig, discovery::Discovery, - error::NetworkError, + error::{NetworkError, SessionError}, eth_requests::IncomingEthRequest, import::{BlockImport, BlockImportOutcome, BlockValidation}, listener::ConnectionListener, @@ -575,7 +575,7 @@ where let mut reason = None; if let Some(ref err) = error { // If the connection was closed due to an error, we report the peer - this.swarm.state_mut().peers_mut().on_connection_dropped( + this.swarm.state_mut().peers_mut().on_active_session_dropped( &remote_addr, &peer_id, err, @@ -583,7 +583,10 @@ where reason = err.as_disconnected(); } else { // Gracefully disconnected - this.swarm.state_mut().peers_mut().on_disconnected(peer_id); + this.swarm + .state_mut() + .peers_mut() + .on_active_session_gracefully_closed(peer_id); } this.event_listeners.send(NetworkEvent::SessionClosed { peer_id, reason }); @@ -609,12 +612,18 @@ where ?error, "Outgoing pending session failed" ); - let peers = this.swarm.state_mut().peers_mut(); - peers.on_closed_outgoing_pending_session(&peer_id); - peers.apply_reputation_change(&peer_id, ReputationChangeKind::FailedToConnect); - if error.map(|err| err.merits_discovery_ban()).unwrap_or_default() { - this.swarm.state_mut().ban_discovery(peer_id, remote_addr.ip()); + if let Some(ref err) = error { + this.swarm.state_mut().peers_mut().on_pending_session_dropped( + &remote_addr, + &peer_id, + err, + ); + } else { + this.swarm + .state_mut() + .peers_mut() + .on_pending_session_gracefully_closed(&peer_id); } } SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => { diff --git a/crates/net/network/src/peers/manager.rs b/crates/net/network/src/peers/manager.rs index d5475388de..1604fea27e 100644 --- a/crates/net/network/src/peers/manager.rs +++ b/crates/net/network/src/peers/manager.rs @@ -1,10 +1,10 @@ use crate::{ - error::{error_merits_discovery_ban, is_fatal_protocol_error}, + error::SessionError, peers::{ - reputation::{is_banned_reputation, DEFAULT_REPUTATION}, + reputation::{is_banned_reputation, BACKOFF_REPUTATION_CHANGE, DEFAULT_REPUTATION}, ReputationChangeKind, ReputationChangeWeights, }, - session::Direction, + session::{Direction, PendingSessionHandshakeError}, }; use futures::StreamExt; use reth_eth_wire::{error::EthStreamError, DisconnectReason}; @@ -17,8 +17,6 @@ use std::{ task::{Context, Poll}, time::Duration, }; - -use crate::peers::reputation::BACKOFF_REPUTATION_CHANGE; use thiserror::Error; use tokio::{ sync::{mpsc, oneshot}, @@ -162,14 +160,6 @@ impl PeersManager { self.connection_info.decr_in() } - /// Invoked when a pending outgoing session was closed. - pub(crate) fn on_closed_outgoing_pending_session(&mut self, peer_id: &PeerId) { - if let Some(mut peer) = self.peers.get_mut(peer_id) { - peer.state = PeerConnectionState::Idle; - } - self.connection_info.decr_out() - } - /// Called when a new _incoming_ active session was established to the given peer. /// /// This will update the state of the peer if not yet tracked. @@ -241,8 +231,28 @@ impl PeersManager { } } - /// Gracefully disconnected - pub(crate) fn on_disconnected(&mut self, peer_id: PeerId) { + /// Gracefully disconnected a pending session + pub(crate) fn on_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) { + if let Some(mut peer) = self.peers.get_mut(peer_id) { + peer.state = PeerConnectionState::Idle; + } else { + return + } + self.connection_info.decr_out() + } + + /// Invoked when a pending outgoing session was closed during authentication or the handshake. + pub(crate) fn on_pending_session_dropped( + &mut self, + remote_addr: &SocketAddr, + peer_id: &PeerId, + err: &PendingSessionHandshakeError, + ) { + self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect) + } + + /// Gracefully disconnected an active session + pub(crate) fn on_active_session_gracefully_closed(&mut self, peer_id: PeerId) { match self.peers.entry(peer_id) { Entry::Occupied(mut entry) => { self.connection_info.decr_state(entry.get().state); @@ -261,6 +271,63 @@ impl PeersManager { self.fill_outbound_slots(); } + /// Called when an _active_ session to a peer was forcefully dropped due to an error. + /// + /// Depending on whether the error is fatal, the peer will be removed from the peer set + /// otherwise its reputation is slashed. + pub(crate) fn on_active_session_dropped( + &mut self, + remote_addr: &SocketAddr, + peer_id: &PeerId, + err: &EthStreamError, + ) { + self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::Dropped) + } + + fn on_connection_failure( + &mut self, + remote_addr: &SocketAddr, + peer_id: &PeerId, + err: impl SessionError, + reputation_change: ReputationChangeKind, + ) { + if err.is_fatal_protocol_error() { + // remove the peer to which we can't establish a connection due to protocol related + // issues. + if let Some(peer) = self.peers.remove(peer_id) { + self.connection_info.decr_state(peer.state); + } + + // ban the peer + self.ban_peer(*peer_id); + + // If the error is caused by a peer that should be banned from discovery + if err.merits_discovery_ban() { + self.queued_actions.push_back(PeerAction::DiscoveryBan { + peer_id: *peer_id, + ip_addr: remote_addr.ip(), + }) + } + } else { + let reputation_change = if err.should_backoff() { + // The peer has signaled that it is currently unable to process any more + // connections, so we will hold off on attempting any new connections for a while + self.backoff_peer(*peer_id); + BACKOFF_REPUTATION_CHANGE.into() + } else { + self.reputation_weights.change(reputation_change) + }; + + if let Some(mut peer) = self.peers.get_mut(peer_id) { + self.connection_info.decr_state(peer.state); + peer.state = PeerConnectionState::Idle; + peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32()); + } + } + + self.fill_outbound_slots(); + } + /// Invoked if a session was disconnected because there's already a connection to the peer. /// /// If the session was an outgoing connection, this means that the peer initiated a connection @@ -275,55 +342,6 @@ impl PeersManager { } } - /// Called when a session to a peer was forcefully disconnected. - /// - /// Depending on whether the error is fatal, the peer will be removed from the peer set - /// otherwise its reputation is slashed. - pub(crate) fn on_connection_dropped( - &mut self, - remote_addr: &SocketAddr, - peer_id: &PeerId, - err: &EthStreamError, - ) { - if is_fatal_protocol_error(err) { - // remove the peer to which we can't establish a connection due to protocol related - // issues. - if let Some(peer) = self.peers.remove(peer_id) { - self.connection_info.decr_state(peer.state); - } - - // ban the peer - self.ban_peer(*peer_id); - - // If the error is caused by a peer that should be banned from discovery - if error_merits_discovery_ban(err) { - self.queued_actions.push_back(PeerAction::DiscoveryBan { - peer_id: *peer_id, - ip_addr: remote_addr.ip(), - }) - } - } else { - let reputation_change = if let Some(DisconnectReason::TooManyPeers) = - err.as_disconnected() - { - // The peer has signaled that it is currently unable to process any more - // connections, so we will hold off on attempting any new connections for a while - self.backoff_peer(*peer_id); - BACKOFF_REPUTATION_CHANGE.into() - } else { - self.reputation_weights.change(ReputationChangeKind::Dropped) - }; - - if let Some(mut peer) = self.peers.get_mut(peer_id) { - self.connection_info.decr_state(peer.state); - peer.state = PeerConnectionState::Idle; - peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32()); - } - } - - self.fill_outbound_slots(); - } - /// Called as follow-up for a discovered peer. /// /// The [`ForkId`] is retrieved from an ENR record that the peer announces over the discovery @@ -803,6 +821,7 @@ mod test { manager::{ConnectionInfo, PeerConnectionState}, PeerAction, ReputationChangeKind, }, + session::PendingSessionHandshakeError, PeersConfig, }; use reth_eth_wire::{ @@ -899,7 +918,7 @@ mod test { }) .await; - peers.on_connection_dropped( + peers.on_active_session_dropped( &socket_addr, &peer, &EthStreamError::P2PStreamError(P2PStreamError::Disconnected( @@ -934,7 +953,7 @@ mod test { } #[tokio::test] - async fn test_ban_on_drop() { + async fn test_ban_on_active_drop() { let peer = PeerId::random(); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); @@ -953,7 +972,7 @@ mod test { }) .await; - peers.on_connection_dropped( + peers.on_active_session_dropped( &socket_addr, &peer, &EthStreamError::P2PStreamError(P2PStreamError::Disconnected( @@ -977,6 +996,50 @@ mod test { assert!(peers.peers.get(&peer).is_none()); } + #[tokio::test] + async fn test_ban_on_pending_drop() { + let peer = PeerId::random(); + let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); + let mut peers = PeersManager::default(); + peers.add_discovered_node(peer, socket_addr); + + match event!(peers) { + PeerAction::Connect { peer_id, .. } => { + assert_eq!(peer_id, peer); + } + _ => unreachable!(), + } + + poll_fn(|cx| { + assert!(peers.poll(cx).is_pending()); + Poll::Ready(()) + }) + .await; + + peers.on_pending_session_dropped( + &socket_addr, + &peer, + &PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError( + P2PStreamError::Disconnected(DisconnectReason::UselessPeer), + )), + ); + + match event!(peers) { + PeerAction::BanPeer { peer_id } => { + assert_eq!(peer_id, peer); + } + _ => unreachable!(), + } + + poll_fn(|cx| { + assert!(peers.poll(cx).is_pending()); + Poll::Ready(()) + }) + .await; + + assert!(peers.peers.get(&peer).is_none()); + } + #[tokio::test] async fn test_reputation_change() { let peer = PeerId::random(); @@ -1038,7 +1101,7 @@ mod test { assert_eq!(p.state, PeerConnectionState::DisconnectingOut); assert!(p.is_banned()); - peers.on_disconnected(peer); + peers.on_active_session_gracefully_closed(peer); let p = peers.peers.get(&peer).unwrap(); assert_eq!(p.state, PeerConnectionState::Idle); @@ -1086,7 +1149,7 @@ mod test { let p = peers.peers.get(&peer).unwrap(); assert_eq!(p.state, PeerConnectionState::DisconnectingOut); - peers.on_disconnected(peer); + peers.on_active_session_gracefully_closed(peer); assert!(peers.peers.get(&peer).is_none()); } diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index daee59de50..aa848c60a0 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -13,13 +13,14 @@ use crate::{ }; use fnv::FnvHashMap; use futures::{future::Either, io, FutureExt, StreamExt}; -use reth_ecies::stream::ECIESStream; +use reth_ecies::{stream::ECIESStream, ECIESError}; use reth_eth_wire::{ capability::{Capabilities, CapabilityMessage}, error::EthStreamError, DisconnectReason, HelloMessage, Status, UnauthedEthStream, UnauthedP2PStream, }; use reth_primitives::{ForkFilter, ForkId, ForkTransition, PeerId, H256, U256}; +use reth_tasks::TaskExecutor; use secp256k1::SecretKey; use std::{ collections::HashMap, @@ -40,10 +41,6 @@ mod active; mod config; mod handle; pub use config::SessionsConfig; -use reth_ecies::ECIESError; - -use crate::error::error_merits_discovery_ban; -use reth_tasks::TaskExecutor; /// Internal identifier for active sessions. #[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)] @@ -550,19 +547,6 @@ pub(crate) enum PendingSessionHandshakeError { Ecies(ECIESError), } -// === impl PendingSessionHandshakeError === - -impl PendingSessionHandshakeError { - /// Returns true if the error indicates that the corresponding peer should be removed from peer - /// discover - pub(crate) fn merits_discovery_ban(&self) -> bool { - match self { - PendingSessionHandshakeError::Eth(eth) => error_merits_discovery_ban(eth), - PendingSessionHandshakeError::Ecies(_) => true, - } - } -} - /// The direction of the connection. #[derive(Debug, Copy, Clone)] pub(crate) enum Direction {