From 37809ce774450568e2fa185d095f34a62a60a11d Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 25 Nov 2022 18:56:49 +0100 Subject: [PATCH] feat(net): add session limits (#253) --- crates/net/network/src/config.rs | 6 + crates/net/network/src/session/config.rs | 182 +++++++++++++++++++++++ crates/net/network/src/session/handle.rs | 4 + crates/net/network/src/session/mod.rs | 84 +++++------ crates/net/network/src/state.rs | 14 +- crates/net/network/src/swarm.rs | 31 ++-- 6 files changed, 242 insertions(+), 79 deletions(-) create mode 100644 crates/net/network/src/session/config.rs diff --git a/crates/net/network/src/config.rs b/crates/net/network/src/config.rs index c7db2f4b9b..16c53d64de 100644 --- a/crates/net/network/src/config.rs +++ b/crates/net/network/src/config.rs @@ -117,6 +117,12 @@ impl NetworkConfigBuilder { } } + /// Sets a custom config for how sessions are handled. + pub fn sessions_config(mut self, config: SessionsConfig) -> Self { + self.sessions_config = Some(config); + self + } + /// Sets the genesis hash for the network. pub fn genesis_hash(mut self, genesis_hash: H256) -> Self { self.genesis_hash = genesis_hash; diff --git a/crates/net/network/src/session/config.rs b/crates/net/network/src/session/config.rs new file mode 100644 index 0000000000..0dfcc1f50b --- /dev/null +++ b/crates/net/network/src/session/config.rs @@ -0,0 +1,182 @@ +//! Configuration types for [`SessionsManager`] + +use crate::session::{Direction, ExceedsSessionLimit}; + +/// Configuration options when creating a [`SessionsManager`]. +pub struct SessionsConfig { + /// Size of the session command buffer (per session task). + pub session_command_buffer: usize, + /// Size of the session event channel buffer. + pub session_event_buffer: usize, + /// Limits to enforce. + /// + /// By default, no limits will be enforced + pub limits: SessionLimits, +} + +impl Default for SessionsConfig { + fn default() -> Self { + SessionsConfig { + // This should be sufficient to slots for handling commands sent to the session task, + // since the manager is the sender. + session_command_buffer: 10, + // This should be greater since the manager is the receiver. The total size will be + // `buffer + num sessions`. Each session can therefor fit at least 1 message in the + // channel. The buffer size is additional capacity. The channel is always drained on + // `poll`. + session_event_buffer: 64, + limits: Default::default(), + } + } +} + +impl SessionsConfig { + /// Sets the buffer size for the bounded communication channel between the manager and its + /// sessions for events emitted by the sessions. + /// + /// It is expected, that the background session task will stall if they outpace the manager. The + /// buffer size provides backpressure on the network I/O. + pub fn with_session_event_buffer(mut self, n: usize) -> Self { + self.session_event_buffer = n; + self + } +} + +/// Limits for sessions. +/// +/// By default, no session limits will be enforced +#[derive(Debug, Clone, Default)] +pub struct SessionLimits { + max_pending_inbound: Option, + max_pending_outbound: Option, + max_established_inbound: Option, + max_established_outbound: Option, +} + +impl SessionLimits { + /// Sets the maximum number of pending incoming sessions. + pub fn with_max_pending_inbound(mut self, limit: u32) -> Self { + self.max_pending_inbound = Some(limit); + self + } + + /// Sets the maximum number of pending outbound sessions. + pub fn with_max_pending_outbound(mut self, limit: u32) -> Self { + self.max_pending_outbound = Some(limit); + self + } + + /// Sets the maximum number of active inbound sessions. + pub fn with_max_established_inbound(mut self, limit: u32) -> Self { + self.max_established_inbound = Some(limit); + self + } + + /// Sets the maximum number of active outbound sessions. + pub fn with_max_established_outbound(mut self, limit: u32) -> Self { + self.max_established_outbound = Some(limit); + self + } +} + +/// Keeps track of all sessions. +#[derive(Debug, Clone)] +pub struct SessionCounter { + /// Limits to enforce. + limits: SessionLimits, + /// Number of pending incoming sessions. + pending_inbound: u32, + /// Number of pending outgoing sessions. + pending_outbound: u32, + /// Number of active inbound sessions. + active_inbound: u32, + /// Number of active outbound sessions. + active_outbound: u32, +} + +// === impl SessionCounter === + +impl SessionCounter { + pub(crate) fn new(limits: SessionLimits) -> Self { + Self { + limits, + pending_inbound: 0, + pending_outbound: 0, + active_inbound: 0, + active_outbound: 0, + } + } + + pub(crate) fn inc_pending_inbound(&mut self) { + self.pending_inbound += 1; + } + + pub(crate) fn inc_pending_outbound(&mut self) { + self.pending_inbound += 1; + } + + pub(crate) fn dec_pending(&mut self, direction: &Direction) { + match direction { + Direction::Outgoing(_) => { + self.pending_outbound -= 1; + } + Direction::Incoming => { + self.pending_inbound -= 1; + } + } + } + + pub(crate) fn inc_active(&mut self, direction: &Direction) { + match direction { + Direction::Outgoing(_) => { + self.active_outbound += 1; + } + Direction::Incoming => { + self.active_inbound += 1; + } + } + } + + pub(crate) fn dec_active(&mut self, direction: &Direction) { + match direction { + Direction::Outgoing(_) => { + self.active_outbound -= 1; + } + Direction::Incoming => { + self.active_inbound -= 1; + } + } + } + + pub(crate) fn ensure_pending_outbound(&self) -> Result<(), ExceedsSessionLimit> { + Self::ensure(self.pending_outbound, self.limits.max_pending_outbound) + } + + pub(crate) fn ensure_pending_inbound(&self) -> Result<(), ExceedsSessionLimit> { + Self::ensure(self.pending_inbound, self.limits.max_pending_inbound) + } + + fn ensure(current: u32, limit: Option) -> Result<(), ExceedsSessionLimit> { + if let Some(limit) = limit { + if current >= limit { + return Err(ExceedsSessionLimit(limit)) + } + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_limits() { + let mut limits = SessionCounter::new(SessionLimits::default().with_max_pending_inbound(2)); + assert!(limits.ensure_pending_outbound().is_ok()); + limits.inc_pending_inbound(); + assert!(limits.ensure_pending_inbound().is_ok()); + limits.inc_pending_inbound(); + assert!(limits.ensure_pending_inbound().is_err()); + } +} diff --git a/crates/net/network/src/session/handle.rs b/crates/net/network/src/session/handle.rs index 72bfc02b75..0aaa672448 100644 --- a/crates/net/network/src/session/handle.rs +++ b/crates/net/network/src/session/handle.rs @@ -24,6 +24,8 @@ use tokio::{ pub(crate) struct PendingSessionHandle { /// Can be used to tell the session to disconnect the connection/abort the handshake process. pub(crate) disconnect_tx: oneshot::Sender<()>, + /// The direction of the session + pub(crate) direction: Direction, } /// An established session with a remote peer. @@ -32,6 +34,8 @@ pub(crate) struct PendingSessionHandle { /// be performed: chain synchronization, block propagation and transaction exchange. #[derive(Debug)] pub(crate) struct ActiveSessionHandle { + /// The direction of the session + pub(crate) direction: Direction, /// The assigned id for this session pub(crate) session_id: SessionId, /// The identifier of the remote peer diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index 66523590a4..05e1546fb5 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -39,7 +39,10 @@ use tokio_stream::wrappers::ReceiverStream; use tracing::{instrument, trace, warn}; mod active; +mod config; mod handle; +use crate::session::config::SessionCounter; +pub use config::SessionsConfig; /// Internal identifier for active sessions. #[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)] @@ -50,6 +53,8 @@ pub struct SessionId(usize); pub(crate) struct SessionManager { /// Tracks the identifier for the next session. next_id: usize, + /// Keeps track of all sessions + counter: SessionCounter, /// The secret key used for authenticating sessions. secret_key: SecretKey, /// The node id of node @@ -108,6 +113,7 @@ impl SessionManager { Self { next_id: 0, + counter: SessionCounter::new(config.limits), secret_key, peer_id, status, @@ -155,7 +161,8 @@ impl SessionManager { stream: TcpStream, remote_addr: SocketAddr, ) -> Result { - // TODO(mattsse): enforce limits + self.counter.ensure_pending_inbound()?; + let session_id = self.next_id(); let (disconnect_tx, disconnect_rx) = oneshot::channel(); let pending_events = self.pending_sessions_tx.clone(); @@ -171,8 +178,9 @@ impl SessionManager { self.fork_filter.clone(), )); - let handle = PendingSessionHandle { disconnect_tx }; + let handle = PendingSessionHandle { disconnect_tx, direction: Direction::Incoming }; self.pending_sessions.insert(session_id, handle); + self.counter.inc_pending_inbound(); Ok(session_id) } @@ -193,8 +201,10 @@ impl SessionManager { self.fork_filter.clone(), )); - let handle = PendingSessionHandle { disconnect_tx }; + let handle = + PendingSessionHandle { disconnect_tx, direction: Direction::Outgoing(remote_peer_id) }; self.pending_sessions.insert(session_id, handle); + self.counter.inc_pending_outbound(); } /// Initiates a shutdown of the channel. @@ -214,6 +224,20 @@ impl SessionManager { } } + /// Removes the [`PendingSessionHandle`] if it exists. + fn remove_pending_session(&mut self, id: &SessionId) -> Option { + let session = self.pending_sessions.remove(id)?; + self.counter.dec_pending(&session.direction); + Some(session) + } + + /// Removes the [`PendingSessionHandle`] if it exists. + fn remove_active_session(&mut self, id: &PeerId) -> Option { + let session = self.active_sessions.remove(id)?; + self.counter.dec_active(&session.direction); + Some(session) + } + /// This polls all the session handles and returns [`SessionEvent`]. /// /// Active sessions are prioritized. @@ -232,7 +256,7 @@ impl SessionManager { ?peer_id, "gracefully disconnected active session." ); - let _ = self.active_sessions.remove(&peer_id); + self.remove_active_session(&peer_id); Poll::Ready(SessionEvent::Disconnected { peer_id, remote_addr }) } ActiveSessionMessage::ClosedOnConnectionError { @@ -241,7 +265,7 @@ impl SessionManager { error, } => { trace!(target : "net::session", ?peer_id, ?error,"closed session."); - let _ = self.active_sessions.remove(&peer_id); + self.remove_active_session(&peer_id); Poll::Ready(SessionEvent::SessionClosedOnConnectionError { remote_addr, peer_id, @@ -287,7 +311,7 @@ impl SessionManager { direction, } => { // move from pending to established. - let _ = self.pending_sessions.remove(&session_id); + self.remove_pending_session(&session_id); let (commands_to_session, commands_rx) = mpsc::channel(self.session_command_buffer); @@ -314,6 +338,7 @@ impl SessionManager { self.spawn(session); let handle = ActiveSessionHandle { + direction, session_id, remote_id: peer_id, established: Instant::now(), @@ -322,6 +347,7 @@ impl SessionManager { }; self.active_sessions.insert(peer_id, handle); + self.counter.inc_active(&direction); return Poll::Ready(SessionEvent::SessionEstablished { peer_id, @@ -339,7 +365,7 @@ impl SessionManager { ?remote_addr, "disconnected pending session" ); - let _ = self.pending_sessions.remove(&session_id); + self.remove_pending_session(&session_id); return match direction { Direction::Incoming => { Poll::Ready(SessionEvent::IncomingPendingSessionClosed { @@ -370,7 +396,7 @@ impl SessionManager { ?peer_id, "connection refused" ); - let _ = self.pending_sessions.remove(&session_id); + self.remove_pending_session(&session_id); return Poll::Ready(SessionEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, @@ -383,7 +409,7 @@ impl SessionManager { error, direction, } => { - let _ = self.pending_sessions.remove(&session_id); + self.remove_pending_session(&session_id); warn!( target : "net::session", ?error, @@ -391,7 +417,7 @@ impl SessionManager { ?remote_addr, "ecies auth failed" ); - let _ = self.pending_sessions.remove(&session_id); + self.remove_pending_session(&session_id); return match direction { Direction::Incoming => { Poll::Ready(SessionEvent::IncomingPendingSessionClosed { @@ -415,41 +441,6 @@ impl SessionManager { } } -/// Configuration options when creating a [`SessionsManager`]. -pub struct SessionsConfig { - /// Size of the session command buffer (per session task). - pub session_command_buffer: usize, - /// Size of the session event channel buffer. - pub session_event_buffer: usize, -} - -impl Default for SessionsConfig { - fn default() -> Self { - SessionsConfig { - // This should be sufficient to slots for handling commands sent to the session task, - // since the manager is the sender. - session_command_buffer: 10, - // This should be greater since the manager is the receiver. The total size will be - // `buffer + num sessions`. Each session can therefor fit at least 1 message in the - // channel. The buffer size is additional capacity. The channel is always drained on - // `poll`. - session_event_buffer: 64, - } - } -} - -impl SessionsConfig { - /// Sets the buffer size for the bounded communication channel between the manager and its - /// sessions for events emitted by the sessions. - /// - /// It is expected, that the background session task will stall if they outpace the manager. The - /// buffer size provides backpressure on the network I/O. - pub fn with_session_event_buffer(mut self, n: usize) -> Self { - self.session_event_buffer = n; - self - } -} - /// Events produced by the [`SessionManager`] pub(crate) enum SessionEvent { /// A new session was successfully authenticated. @@ -509,7 +500,7 @@ pub(crate) enum SessionEvent { /// accepted. #[derive(Debug, Clone, thiserror::Error)] #[error("Session limit reached {0}")] -pub struct ExceedsSessionLimit(usize); +pub struct ExceedsSessionLimit(pub(crate) u32); /// Starts the authentication process for a connection initiated by a remote peer. /// @@ -598,6 +589,7 @@ impl Direction { } } +/// Authenticates a session async fn authenticate( disconnect_rx: oneshot::Receiver<()>, events: mpsc::Sender, diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index c16481f4ea..5c416b7151 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -100,8 +100,7 @@ where capabilities: Arc, status: Status, request_tx: PeerRequestSender, - ) -> Result<(), AddSessionError> { - // TODO add capacity check + ) { debug_assert!(!self.connected_peers.contains_key(&peer), "Already connected; not possible"); // find the corresponding block number @@ -119,8 +118,6 @@ where blocks: LruCache::new(NonZeroUsize::new(PEER_BLOCK_CACHE_LIMIT).unwrap()), }, ); - - Ok(()) } /// Event hook for a disconnected session for the peer. @@ -424,12 +421,3 @@ pub(crate) enum StateAction { reason: Option, }, } - -#[derive(Debug, thiserror::Error)] -pub enum AddSessionError { - #[error("No capacity for new sessions")] - AtCapacity { - /// The peer of the session - peer: PeerId, - }, -} diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs index 1dbc0b3289..e09993b138 100644 --- a/crates/net/network/src/swarm.rs +++ b/crates/net/network/src/swarm.rs @@ -3,13 +3,12 @@ use crate::{ listener::{ConnectionListener, ListenerEvent}, message::{PeerMessage, PeerRequestSender}, session::{Direction, SessionEvent, SessionId, SessionManager}, - state::{AddSessionError, NetworkState, StateAction}, + state::{NetworkState, StateAction}, }; use futures::Stream; use reth_eth_wire::{ capability::{Capabilities, CapabilityMessage}, error::EthStreamError, - DisconnectReason, }; use reth_interfaces::provider::BlockProvider; use reth_primitives::PeerId; @@ -88,29 +87,21 @@ where status, messages, direction, - } => match self.state.on_session_activated( - peer_id, - capabilities.clone(), - status, - messages.clone(), - ) { - Ok(_) => Some(SwarmEvent::SessionEstablished { + } => { + self.state.on_session_activated( + peer_id, + capabilities.clone(), + status, + messages.clone(), + ); + Some(SwarmEvent::SessionEstablished { peer_id, remote_addr, capabilities, messages, direction, - }), - Err(err) => { - match err { - AddSessionError::AtCapacity { peer } => { - self.sessions.disconnect(peer, Some(DisconnectReason::TooManyPeers)); - } - }; - self.state.peers_mut().on_disconnected(&peer_id); - None - } - }, + }) + } SessionEvent::ValidMessage { peer_id, message } => { Some(SwarmEvent::ValidMessage { peer_id, message }) }