From 39a667bbfeeb1d6d0d8860814030df5abff680c4 Mon Sep 17 00:00:00 2001 From: greg <82421016+greged93@users.noreply.github.com> Date: Tue, 5 Nov 2024 15:58:16 +0100 Subject: [PATCH] feat: graceful incoming connection closing (#12282) Co-authored-by: Matthias Seitz --- crates/net/network/src/session/mod.rs | 46 +++++++++++++++++++++++++++ crates/net/network/src/swarm.rs | 11 +++++-- 2 files changed, 54 insertions(+), 3 deletions(-) diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index 712f076b47..30b1cda9da 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -110,6 +110,8 @@ pub struct SessionManager { active_session_rx: ReceiverStream, /// Additional `RLPx` sub-protocols to be used by the session manager. extra_protocols: RlpxSubProtocols, + /// Tracks the ongoing graceful disconnections attempts for incoming connections. + disconnections_counter: DisconnectionsCounter, /// Metrics for the session manager. metrics: SessionManagerMetrics, } @@ -151,6 +153,7 @@ impl SessionManager { active_session_tx: MeteredPollSender::new(active_session_tx, "network_active_session"), active_session_rx: ReceiverStream::new(active_session_rx), extra_protocols, + disconnections_counter: Default::default(), metrics: Default::default(), } } @@ -376,6 +379,35 @@ impl SessionManager { Some(session) } + /// Try to gracefully disconnect an incoming connection by initiating a ECIES connection and + /// sending a disconnect. If [`SessionManager`] is at capacity for ongoing disconnections, will + /// simply drop the incoming connection. + pub(crate) fn try_disconnect_incoming_connection( + &self, + stream: TcpStream, + reason: DisconnectReason, + ) { + if !self.disconnections_counter.has_capacity() { + // drop the connection if we don't have capacity for gracefully disconnecting + return + } + + let guard = self.disconnections_counter.clone(); + let secret_key = self.secret_key; + + self.spawn(async move { + trace!( + target: "net::session", + "gracefully disconnecting incoming connection" + ); + if let Ok(stream) = get_ecies_stream(stream, secret_key, Direction::Incoming).await { + let mut unauth = UnauthedP2PStream::new(stream); + let _ = unauth.send_disconnect(reason).await; + drop(guard); + } + }); + } + /// This polls all the session handles and returns [`SessionEvent`]. /// /// Active sessions are prioritized. @@ -615,6 +647,20 @@ impl SessionManager { } } +/// A counter for ongoing graceful disconnections attempts. +#[derive(Default, Debug, Clone)] +struct DisconnectionsCounter(Arc<()>); + +impl DisconnectionsCounter { + const MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS: usize = 15; + + /// Returns true if the [`DisconnectionsCounter`] still has capacity + /// for an additional graceful disconnection. + fn has_capacity(&self) -> bool { + Arc::strong_count(&self.0) <= Self::MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS + } +} + /// Events produced by the [`SessionManager`] #[derive(Debug)] pub enum SessionEvent { diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs index 0be7ae1c1b..c1fe9f9e23 100644 --- a/crates/net/network/src/swarm.rs +++ b/crates/net/network/src/swarm.rs @@ -8,7 +8,8 @@ use std::{ use futures::Stream; use reth_eth_wire::{ - capability::CapabilityMessage, errors::EthStreamError, Capabilities, EthVersion, Status, + capability::CapabilityMessage, errors::EthStreamError, Capabilities, DisconnectReason, + EthVersion, Status, }; use reth_network_api::PeerRequestSender; use reth_network_peers::PeerId; @@ -32,7 +33,7 @@ use crate::{ /// [`SessionManager`]. Outgoing connections are either initiated on demand or triggered by the /// [`NetworkState`] and also delegated to the [`NetworkState`]. /// -/// Following diagram gives displays the dataflow contained in the [`Swarm`] +/// Following diagram displays the dataflow contained in the [`Swarm`] /// /// The [`ConnectionListener`] yields incoming [`TcpStream`]s from peers that are spawned as session /// tasks. After a successful `RLPx` authentication, the task is ready to accept ETH requests or @@ -70,7 +71,7 @@ impl Swarm { Self { incoming, sessions, state } } - /// Adds an additional protocol handler to the `RLPx` sub-protocol list. + /// Adds a protocol handler to the `RLPx` sub-protocol list. pub(crate) fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) { self.sessions_mut().add_rlpx_sub_protocol(protocol); } @@ -201,6 +202,10 @@ impl Swarm { } InboundConnectionError::ExceedsCapacity => { trace!(target: "net", ?remote_addr, "No capacity for incoming connection"); + self.sessions.try_disconnect_incoming_connection( + stream, + DisconnectReason::TooManyPeers, + ); } } return None