refactor(net): unify closed incoming session handling (#600)

This commit is contained in:
Matthias Seitz
2022-12-24 12:38:21 +01:00
committed by GitHub
parent 43008b7b07
commit 5affa67805
6 changed files with 145 additions and 52 deletions

View File

@@ -47,21 +47,28 @@ impl BanList {
evicted
}
/// Removes all entries that should no longer be banned.
pub fn evict(&mut self, now: Instant) {
self.banned_ips.retain(|_, until| {
/// Removes all ip addresses that are no longer banned.
pub fn evict_ips(&mut self, now: Instant) -> Vec<IpAddr> {
let mut evicted = Vec::new();
self.banned_ips.retain(|peer, until| {
if let Some(until) = until {
return *until > now
if now > *until {
evicted.push(*peer);
return false
}
}
true
});
evicted
}
self.banned_peers.retain(|_, until| {
if let Some(until) = until {
return *until > now
}
true
});
/// Removes all entries that should no longer be banned.
///
/// Returns the evicted entries.
pub fn evict(&mut self, now: Instant) -> (Vec<IpAddr>, Vec<PeerId>) {
let ips = self.evict_ips(now);
let peers = self.evict_peers(now);
(ips, peers)
}
/// Returns true if either the given peer id _or_ ip address is banned.

View File

@@ -63,6 +63,15 @@ impl SessionError for EthStreamError {
P2PStreamError::HandshakeError(
P2PHandshakeError::NonHelloMessageInHandshake
) |
P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
DisconnectReason::UselessPeer
)) |
P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
DisconnectReason::IncompatibleP2PProtocolVersion
)) |
P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
DisconnectReason::ProtocolBreach
)) |
P2PStreamError::UnknownReservedMessageId(_) |
P2PStreamError::EmptyProtocolMessage |
P2PStreamError::ParseVersionError(_) |
@@ -70,6 +79,7 @@ impl SessionError for EthStreamError {
P2PStreamError::Disconnected(
DisconnectReason::IncompatibleP2PProtocolVersion
) |
P2PStreamError::Disconnected(DisconnectReason::ProtocolBreach) |
P2PStreamError::MismatchedProtocolVersion { .. }
)
}
@@ -117,6 +127,17 @@ impl SessionError for PendingSessionHandshakeError {
mod tests {
use super::*;
#[test]
fn test_is_fatal_disconnect() {
let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
DisconnectReason::UselessPeer,
)),
));
assert!(err.is_fatal_protocol_error());
}
#[test]
fn test_should_backoff() {
let err = EthStreamError::P2PStreamError(P2PStreamError::HandshakeError(

View File

@@ -18,7 +18,7 @@
use crate::{
config::NetworkConfig,
discovery::Discovery,
error::{NetworkError, SessionError},
error::NetworkError,
eth_requests::IncomingEthRequest,
import::{BlockImport, BlockImportOutcome, BlockValidation},
listener::ConnectionListener,
@@ -597,10 +597,17 @@ where
?error,
"Incoming pending session failed"
);
this.swarm.state_mut().peers_mut().on_closed_incoming_pending_session();
if error.map(|err| err.merits_discovery_ban()).unwrap_or_default() {
this.swarm.state_mut().ban_ip_discovery(remote_addr.ip());
if let Some(ref err) = error {
this.swarm
.state_mut()
.peers_mut()
.on_incoming_pending_session_dropped(remote_addr, err);
} else {
this.swarm
.state_mut()
.peers_mut()
.on_incoming_pending_session_gracefully_closed();
}
}
SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {

View File

@@ -139,7 +139,7 @@ impl PeersManager {
///
/// returns an error if the inbound ip address is on the ban list or
/// we have reached our limit for max inbound connections
pub(crate) fn on_inbound_pending_session(
pub(crate) fn on_incoming_pending_session(
&mut self,
addr: IpAddr,
) -> Result<(), InboundConnectionError> {
@@ -155,8 +155,32 @@ impl PeersManager {
Ok(())
}
/// Invoked when a previous call to [Self::on_incoming_pending_session] succeeded but it was
/// rejected.
pub(crate) fn on_incoming_pending_session_rejected_internally(&mut self) {
self.connection_info.decr_in();
}
/// Invoked when a pending session was closed.
pub(crate) fn on_closed_incoming_pending_session(&mut self) {
pub(crate) fn on_incoming_pending_session_gracefully_closed(&mut self) {
self.connection_info.decr_in()
}
/// Invoked when a pending session was closed.
pub(crate) fn on_incoming_pending_session_dropped(
&mut self,
remote_addr: SocketAddr,
err: &PendingSessionHandshakeError,
) {
if err.is_fatal_protocol_error() {
self.ban_ip(remote_addr.ip());
if err.merits_discovery_ban() {
self.queued_actions
.push_back(PeerAction::DiscoveryBanIp { ip_addr: remote_addr.ip() })
}
}
self.connection_info.decr_in()
}
@@ -189,12 +213,17 @@ impl PeersManager {
}
}
/// Bans the peer temporarily with the given timeout
/// Bans the peer temporarily with the configured ban timeout
fn ban_peer(&mut self, peer_id: PeerId) {
self.ban_list.ban_peer_until(peer_id, std::time::Instant::now() + self.ban_duration);
self.queued_actions.push_back(PeerAction::BanPeer { peer_id });
}
/// Bans the IP temporarily with the configured ban timeout
fn ban_ip(&mut self, ip: IpAddr) {
self.ban_list.ban_ip_until(ip, std::time::Instant::now() + self.ban_duration);
}
/// Temporarily puts the peer in timeout
fn backoff_peer(&mut self, peer_id: PeerId) {
trace!(target: "net::peers", ?peer_id, "backing off");
@@ -307,7 +336,7 @@ impl PeersManager {
// If the error is caused by a peer that should be banned from discovery
if err.merits_discovery_ban() {
self.queued_actions.push_back(PeerAction::DiscoveryBan {
self.queued_actions.push_back(PeerAction::DiscoveryBanPeerId {
peer_id: *peer_id,
ip_addr: remote_addr.ip(),
})
@@ -489,7 +518,9 @@ impl PeersManager {
}
if self.unban_interval.poll_tick(cx).is_ready() {
for peer_id in self.ban_list.evict_peers(std::time::Instant::now()) {
let (_, unbanned_peers) = self.ban_list.evict(std::time::Instant::now());
for peer_id in unbanned_peers {
if let Some(peer) = self.peers.get_mut(&peer_id) {
peer.unban();
} else {
@@ -729,7 +760,9 @@ pub enum PeerAction {
peer_id: PeerId,
},
/// Ban the peer in discovery.
DiscoveryBan { peer_id: PeerId, ip_addr: IpAddr },
DiscoveryBanPeerId { peer_id: PeerId, ip_addr: IpAddr },
/// Ban the IP in discovery.
DiscoveryBanIp { ip_addr: IpAddr },
/// Ban the peer temporarily
BanPeer { peer_id: PeerId },
/// Unban the peer temporarily
@@ -829,7 +862,7 @@ mod test {
PeersConfig,
};
use reth_eth_wire::{
error::{EthStreamError, P2PStreamError},
error::{EthStreamError, P2PHandshakeError, P2PStreamError},
DisconnectReason,
};
use reth_net_common::ban_list::BanList;
@@ -1045,40 +1078,59 @@ mod test {
}
#[tokio::test]
async fn test_reputation_change() {
let peer = PeerId::random();
async fn test_internally_closed_incoming() {
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_discovered_node(peer, socket_addr);
match event!(peers) {
PeerAction::Connect { peer_id, remote_addr } => {
assert_eq!(peer_id, peer);
assert_eq!(remote_addr, socket_addr);
}
_ => unreachable!(),
}
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
assert_eq!(peers.connection_info.num_inbound, 1);
peers.on_incoming_pending_session_rejected_internally();
assert_eq!(peers.connection_info.num_inbound, 0);
}
peers.apply_reputation_change(&peer, ReputationChangeKind::BadProtocol);
#[tokio::test]
async fn test_closed_incoming() {
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
let p = peers.peers.get(&peer).unwrap();
assert!(p.is_banned());
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
assert_eq!(peers.connection_info.num_inbound, 1);
peers.on_incoming_pending_session_gracefully_closed();
assert_eq!(peers.connection_info.num_inbound, 0);
}
match event!(peers) {
PeerAction::Disconnect { peer_id, .. } => {
assert_eq!(peer_id, peer);
}
_ => {
unreachable!()
}
}
#[tokio::test]
async fn test_dropped_incoming() {
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let ban_duration = Duration::from_millis(500);
let config = PeersConfig { ban_duration, ..Default::default() };
let mut peers = PeersManager::new(config);
match event!(peers) {
PeerAction::BanPeer { peer_id } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
assert_eq!(peers.connection_info.num_inbound, 1);
let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
DisconnectReason::UselessPeer,
)),
));
peers.on_incoming_pending_session_dropped(socket_addr, &err);
assert_eq!(peers.connection_info.num_inbound, 0);
assert!(peers.ban_list.is_banned_ip(&socket_addr.ip()));
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
// unbanned after timeout
tokio::time::sleep(ban_duration).await;
poll_fn(|cx| {
let _ = peers.poll(cx);
Poll::Ready(())
})
.await;
assert!(!peers.ban_list.is_banned_ip(&socket_addr.ip()));
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
}
#[tokio::test]
@@ -1176,7 +1228,7 @@ mod test {
let ban_list = BanList::new(HashSet::new(), vec![ip]);
let config = PeersConfig::default().with_ban_list(ban_list);
let mut peer_manager = PeersManager::new(config);
let a = peer_manager.on_inbound_pending_session(socket_addr.ip());
let a = peer_manager.on_incoming_pending_session(socket_addr.ip());
// because we have no active peers this should be fine for testings
match a {
Ok(_) => panic!(),

View File

@@ -280,11 +280,13 @@ where
self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason });
}
PeerAction::DisconnectBannedIncoming { peer_id } => {
// TODO: can IP ban
self.state_fetcher.on_pending_disconnect(&peer_id);
self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason: None });
}
PeerAction::DiscoveryBan { peer_id, ip_addr } => self.ban_discovery(peer_id, ip_addr),
PeerAction::DiscoveryBanPeerId { peer_id, ip_addr } => {
self.ban_discovery(peer_id, ip_addr)
}
PeerAction::DiscoveryBanIp { ip_addr } => self.ban_ip_discovery(ip_addr),
PeerAction::BanPeer { .. } => {}
PeerAction::UnBanPeer { .. } => {}
}

View File

@@ -179,8 +179,9 @@ where
return Some(SwarmEvent::TcpListenerClosed { remote_addr: address })
}
ListenerEvent::Incoming { stream, remote_addr } => {
// ensure we can handle an incoming connection from this address
if let Err(err) =
self.state_mut().peers_mut().on_inbound_pending_session(remote_addr.ip())
self.state_mut().peers_mut().on_incoming_pending_session(remote_addr.ip())
{
match err {
InboundConnectionError::IpBanned => {
@@ -202,6 +203,9 @@ where
}
Err(err) => {
warn!(target: "net", ?err, "Incoming connection rejected");
self.state_mut()
.peers_mut()
.on_incoming_pending_session_rejected_internally();
}
}
}