mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-08 03:01:12 -04:00
feat: autoscale session event capacity (#7149)
This commit is contained in:
@@ -1386,6 +1386,11 @@ impl PeersConfig {
|
||||
self
|
||||
}
|
||||
|
||||
/// Returns the maximum number of peers, inbound and outbound.
|
||||
pub const fn max_peers(&self) -> usize {
|
||||
self.connection_info.max_outbound + self.connection_info.max_inbound
|
||||
}
|
||||
|
||||
/// Read from file nodes available at launch. Ignored if None.
|
||||
pub fn with_basic_nodes_from_file(
|
||||
self,
|
||||
|
||||
@@ -16,6 +16,16 @@ pub const INITIAL_REQUEST_TIMEOUT: Duration = Duration::from_secs(20);
|
||||
/// This is the time a peer has to answer a response.
|
||||
pub const PROTOCOL_BREACH_REQUEST_TIMEOUT: Duration = Duration::from_secs(2 * 60);
|
||||
|
||||
/// The default maximum number of peers.
|
||||
const DEFAULT_MAX_PEERS: usize =
|
||||
DEFAULT_MAX_COUNT_PEERS_OUTBOUND as usize + DEFAULT_MAX_COUNT_PEERS_INBOUND as usize;
|
||||
|
||||
/// The default session event buffer size.
|
||||
///
|
||||
/// The actual capacity of the event channel will be `buffer + num sessions`.
|
||||
/// With maxed out peers, this will allow for 3 messages per session (average)
|
||||
const DEFAULT_SESSION_EVENT_BUFFER_SIZE: usize = DEFAULT_MAX_PEERS * 2;
|
||||
|
||||
/// Configuration options when creating a [SessionManager](crate::session::SessionManager).
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
@@ -52,9 +62,7 @@ impl Default for SessionsConfig {
|
||||
// `poll`.
|
||||
// The default is twice the maximum number of available slots, if all slots are occupied
|
||||
// the buffer will have capacity for 3 messages per session (average).
|
||||
session_event_buffer: (DEFAULT_MAX_COUNT_PEERS_OUTBOUND as usize +
|
||||
DEFAULT_MAX_COUNT_PEERS_INBOUND as usize) *
|
||||
2,
|
||||
session_event_buffer: DEFAULT_SESSION_EVENT_BUFFER_SIZE,
|
||||
limits: Default::default(),
|
||||
initial_internal_request_timeout: INITIAL_REQUEST_TIMEOUT,
|
||||
protocol_breach_request_timeout: PROTOCOL_BREACH_REQUEST_TIMEOUT,
|
||||
@@ -72,6 +80,23 @@ impl SessionsConfig {
|
||||
self.session_event_buffer = n;
|
||||
self
|
||||
}
|
||||
|
||||
/// Helper function to set the buffer size for the bounded communication channel between the
|
||||
/// manager and its sessions for events emitted by the sessions.
|
||||
///
|
||||
/// This scales the buffer size based on the configured number of peers, where the base line is
|
||||
/// the default buffer size.
|
||||
///
|
||||
/// If the number of peers is greater than the default, the buffer size will be scaled up to
|
||||
/// match the default `buffer size / max peers` ratio.
|
||||
///
|
||||
/// Note: This is capped at 10 times the default buffer size.
|
||||
pub fn with_upscaled_event_buffer(mut self, num_peers: usize) -> Self {
|
||||
if num_peers > DEFAULT_MAX_PEERS {
|
||||
self.session_event_buffer = (num_peers * 2).min(DEFAULT_SESSION_EVENT_BUFFER_SIZE * 10);
|
||||
}
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// Limits for sessions.
|
||||
@@ -212,4 +237,15 @@ mod tests {
|
||||
limits.inc_pending_inbound();
|
||||
assert!(limits.ensure_pending_inbound().is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn scale_session_event_buffer() {
|
||||
let config = SessionsConfig::default().with_upscaled_event_buffer(10);
|
||||
assert_eq!(config.session_event_buffer, DEFAULT_SESSION_EVENT_BUFFER_SIZE);
|
||||
let default_ration = config.session_event_buffer / DEFAULT_MAX_PEERS;
|
||||
|
||||
let config = SessionsConfig::default().with_upscaled_event_buffer(DEFAULT_MAX_PEERS * 2);
|
||||
let expected_ration = config.session_event_buffer / (DEFAULT_MAX_PEERS * 2);
|
||||
assert_eq!(default_ration, expected_ration);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@ use reth_network::{
|
||||
DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
|
||||
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
|
||||
},
|
||||
HelloMessageWithProtocols, NetworkConfigBuilder,
|
||||
HelloMessageWithProtocols, NetworkConfigBuilder, SessionsConfig,
|
||||
};
|
||||
use reth_primitives::{mainnet_nodes, ChainSpec, NodeRecord};
|
||||
use secp256k1::SecretKey;
|
||||
@@ -114,7 +114,7 @@ impl NetworkArgs {
|
||||
let peers_file = self.peers_file.clone().unwrap_or(default_peers_file);
|
||||
|
||||
// Configure peer connections
|
||||
let peer_config = config
|
||||
let peers_config = config
|
||||
.peers
|
||||
.clone()
|
||||
.with_max_inbound_opt(self.max_inbound_peers)
|
||||
@@ -131,7 +131,10 @@ impl NetworkArgs {
|
||||
// Configure basic network stack
|
||||
let mut network_config_builder = config
|
||||
.network_config(self.nat, self.persistent_peers_file(peers_file), secret_key)
|
||||
.peer_config(peer_config)
|
||||
.sessions_config(
|
||||
SessionsConfig::default().with_upscaled_event_buffer(peers_config.max_peers()),
|
||||
)
|
||||
.peer_config(peers_config)
|
||||
.boot_nodes(self.bootnodes.clone().unwrap_or(chain_bootnodes))
|
||||
.chain_spec(chain_spec)
|
||||
.transactions_manager_config(transactions_manager_config);
|
||||
|
||||
Reference in New Issue
Block a user