From 8835443c35fb0ceba78486b4c438cc274f52b50c Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Wed, 26 Apr 2023 10:58:17 -0400 Subject: [PATCH] feat: include SessionsConfig in reth.toml (#2416) Co-authored-by: Matthias Seitz --- crates/net/network/src/lib.rs | 2 +- crates/net/network/src/session/config.rs | 4 +- crates/net/network/src/session/mod.rs | 55 +++++++++++++----------- crates/staged-sync/src/config.rs | 9 +++- 4 files changed, 39 insertions(+), 31 deletions(-) diff --git a/crates/net/network/src/lib.rs b/crates/net/network/src/lib.rs index 66ec9d1cc9..79971e7699 100644 --- a/crates/net/network/src/lib.rs +++ b/crates/net/network/src/lib.rs @@ -145,6 +145,6 @@ pub use manager::{NetworkEvent, NetworkManager}; pub use message::PeerRequest; pub use network::NetworkHandle; pub use peers::PeersConfig; -pub use session::PeerInfo; +pub use session::{PeerInfo, SessionsConfig}; pub use reth_eth_wire::DisconnectReason; diff --git a/crates/net/network/src/session/config.rs b/crates/net/network/src/session/config.rs index 70ffd04889..41488b3ee5 100644 --- a/crates/net/network/src/session/config.rs +++ b/crates/net/network/src/session/config.rs @@ -17,7 +17,7 @@ pub const INITIAL_REQUEST_TIMEOUT: Duration = Duration::from_secs(20); pub const PROTOCOL_BREACH_REQUEST_TIMEOUT: Duration = Duration::from_secs(2 * 60); /// Configuration options when creating a [SessionManager](crate::session::SessionManager). -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct SessionsConfig { /// Size of the session command buffer (per session task). @@ -74,7 +74,7 @@ impl SessionsConfig { /// Limits for sessions. /// /// By default, no session limits will be enforced -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct SessionLimits { max_pending_inbound: Option, diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index 213dae70bd..e0adb667d4 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -232,33 +232,36 @@ impl SessionManager { /// Starts a new pending session from the local node to the given remote node. pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_peer_id: PeerId) { - let session_id = self.next_id(); - let (disconnect_tx, disconnect_rx) = oneshot::channel(); - let pending_events = self.pending_sessions_tx.clone(); - let secret_key = self.secret_key; - let hello_message = self.hello_message.clone(); - let fork_filter = self.fork_filter.clone(); - let status = self.status; - let band_with_meter = self.bandwidth_meter.clone(); - self.spawn(start_pending_outbound_session( - disconnect_rx, - pending_events, - session_id, - remote_addr, - remote_peer_id, - secret_key, - hello_message, - status, - fork_filter, - band_with_meter, - )); + // The error can be dropped because no dial will be made if it would exceed the limit + if self.counter.ensure_pending_outbound().is_ok() { + let session_id = self.next_id(); + let (disconnect_tx, disconnect_rx) = oneshot::channel(); + let pending_events = self.pending_sessions_tx.clone(); + let secret_key = self.secret_key; + let hello_message = self.hello_message.clone(); + let fork_filter = self.fork_filter.clone(); + let status = self.status; + let band_with_meter = self.bandwidth_meter.clone(); + self.spawn(start_pending_outbound_session( + disconnect_rx, + pending_events, + session_id, + remote_addr, + remote_peer_id, + secret_key, + hello_message, + status, + fork_filter, + band_with_meter, + )); - let handle = PendingSessionHandle { - disconnect_tx: Some(disconnect_tx), - direction: Direction::Outgoing(remote_peer_id), - }; - self.pending_sessions.insert(session_id, handle); - self.counter.inc_pending_outbound(); + let handle = PendingSessionHandle { + disconnect_tx: Some(disconnect_tx), + direction: Direction::Outgoing(remote_peer_id), + }; + self.pending_sessions.insert(session_id, handle); + self.counter.inc_pending_outbound(); + } } /// Initiates a shutdown of the channel. diff --git a/crates/staged-sync/src/config.rs b/crates/staged-sync/src/config.rs index 7eabe81f18..e47d31551c 100644 --- a/crates/staged-sync/src/config.rs +++ b/crates/staged-sync/src/config.rs @@ -4,7 +4,7 @@ use reth_downloaders::{ bodies::bodies::BodiesDownloaderBuilder, headers::reverse_headers::ReverseHeadersDownloaderBuilder, }; -use reth_network::{NetworkConfigBuilder, PeersConfig}; +use reth_network::{NetworkConfigBuilder, PeersConfig, SessionsConfig}; use secp256k1::SecretKey; use serde::{Deserialize, Serialize}; use std::path::PathBuf; @@ -18,6 +18,8 @@ pub struct Config { pub stages: StageConfig, /// Configuration for the discovery service. pub peers: PeersConfig, + /// Configuration for peer sessions. + pub sessions: SessionsConfig, } impl Config { @@ -36,7 +38,10 @@ impl Config { let discv4 = Discv4Config::builder().external_ip_resolver(Some(nat_resolution_method)).clone(); - NetworkConfigBuilder::new(secret_key).peer_config(peer_config).discovery(discv4) + NetworkConfigBuilder::new(secret_key) + .sessions_config(self.sessions.clone()) + .peer_config(peer_config) + .discovery(discv4) } }