feat: NetworkManager hibernate command, NetworkHandle hibernate funct… (#6186)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Luca Provini
2024-01-29 18:15:48 +01:00
committed by GitHub
parent abb7fe6fb6
commit 3faf9d87a3
4 changed files with 84 additions and 11 deletions

View File

@@ -29,7 +29,7 @@ use crate::{
protocol::IntoRlpxSubProtocol,
session::SessionManager,
state::NetworkState,
swarm::{NetworkConnectionState, Swarm, SwarmEvent},
swarm::{Swarm, SwarmEvent},
transactions::NetworkTransactionEvent,
FetchClient, NetworkBuilder,
};
@@ -224,7 +224,7 @@ where
Arc::clone(&num_active_peers),
);
let swarm = Swarm::new(incoming, sessions, state, NetworkConnectionState::default());
let swarm = Swarm::new(incoming, sessions, state);
let (to_manager_tx, from_handle_rx) = mpsc::unbounded_channel();
@@ -551,6 +551,14 @@ where
NetworkHandleMessage::DisconnectPeer(peer_id, reason) => {
self.swarm.sessions_mut().disconnect(peer_id, reason);
}
NetworkHandleMessage::SetNetworkState(net_state) => {
// Sets network connection state between Active and Hibernate.
// If hibernate stops the node to fill new outbound
// connections, this is beneficial for sync stages that do not require a network
// connection.
self.swarm.on_network_state_change(net_state);
}
NetworkHandleMessage::Shutdown(tx) => {
// Set connection status to `Shutdown`. Stops node to accept
// new incoming connections as well as sending connection requests to newly

View File

@@ -1,6 +1,6 @@
use crate::{
config::NetworkMode, discovery::DiscoveryEvent, manager::NetworkEvent, message::PeerRequest,
peers::PeersHandle, protocol::RlpxSubProtocol, FetchClient,
peers::PeersHandle, protocol::RlpxSubProtocol, swarm::NetworkConnectionState, FetchClient,
};
use async_trait::async_trait;
use parking_lot::Mutex;
@@ -152,6 +152,25 @@ impl NetworkHandle {
rx.await
}
/// Set network connection state to Active.
///
/// New outbound connections will be established if there's capacity.
pub fn set_network_active(&self) {
self.set_network_conn(NetworkConnectionState::Active);
}
/// Set network connection state to Hibernate.
///
/// No new outbound connections will be established.
pub fn set_network_hibernate(&self) {
self.set_network_conn(NetworkConnectionState::Hibernate);
}
/// Set network connection state.
fn set_network_conn(&self, network_conn: NetworkConnectionState) {
self.send_message(NetworkHandleMessage::SetNetworkState(network_conn));
}
/// Whether tx gossip is disabled
pub fn tx_gossip_disabled(&self) -> bool {
self.inner.tx_gossip_disabled
@@ -432,6 +451,8 @@ pub(crate) enum NetworkHandleMessage {
GetReputationById(PeerId, oneshot::Sender<Option<Reputation>>),
/// Initiates a graceful shutdown of the network via a oneshot sender.
Shutdown(oneshot::Sender<()>),
/// Sets the network state between hibernation and active.
SetNetworkState(NetworkConnectionState),
/// Adds a new listener for `DiscoveryEvent`.
DiscoveryListener(UnboundedSender<DiscoveryEvent>),
/// Adds an additional `RlpxSubProtocol`.

View File

@@ -6,6 +6,7 @@ use crate::{
DEFAULT_MAX_PEERS_OUTBOUND,
},
session::{Direction, PendingSessionHandshakeError},
swarm::NetworkConnectionState,
};
use futures::StreamExt;
use reth_eth_wire::{errors::EthStreamError, DisconnectReason};
@@ -114,6 +115,8 @@ pub struct PeersManager {
last_tick: Instant,
/// Maximum number of backoff attempts before we give up on a peer and dropping.
max_backoff_count: u32,
/// Tracks the connection state of the node
net_connection_state: NetworkConnectionState,
}
impl PeersManager {
@@ -166,6 +169,7 @@ impl PeersManager {
connect_trusted_nodes_only,
last_tick: Instant::now(),
max_backoff_count,
net_connection_state: NetworkConnectionState::default(),
}
}
@@ -688,7 +692,12 @@ impl PeersManager {
fn fill_outbound_slots(&mut self) {
self.tick();
// as long as there a slots available try to fill them with the best peers
if !self.net_connection_state.is_active() {
// nothing to fill
return
}
// as long as there a slots available fill them with the best peers
let mut new_outbound_dials = 1;
while self.connection_info.has_out_capacity() {
let action = {
@@ -719,6 +728,21 @@ impl PeersManager {
}
}
/// Keeps track of network state changes.
pub fn on_network_state_change(&mut self, state: NetworkConnectionState) {
self.net_connection_state = state;
}
/// Returns the current network connection state.
pub fn connection_state(&self) -> &NetworkConnectionState {
&self.net_connection_state
}
/// Sets net_connection_state to ShuttingDown.
pub fn on_shutdown(&mut self) {
self.net_connection_state = NetworkConnectionState::ShuttingDown;
}
/// Advances the state.
///
/// Event hooks invoked externally may trigger a new [`PeerAction`] that are buffered until

View File

@@ -57,8 +57,6 @@ pub(crate) struct Swarm<C> {
sessions: SessionManager,
/// Tracks the entire state of the network and handles events received from the sessions.
state: NetworkState<C>,
/// Tracks the connection state of the node
net_connection_state: NetworkConnectionState,
}
// === impl Swarm ===
@@ -69,9 +67,8 @@ impl<C> Swarm<C> {
incoming: ConnectionListener,
sessions: SessionManager,
state: NetworkState<C>,
net_connection_state: NetworkConnectionState,
) -> Self {
Self { incoming, sessions, state, net_connection_state }
Self { incoming, sessions, state }
}
/// Adds an additional protocol handler to the RLPx sub-protocol list.
@@ -274,13 +271,18 @@ where
/// Set network connection state to `ShuttingDown`
pub(crate) fn on_shutdown_requested(&mut self) {
self.net_connection_state = NetworkConnectionState::ShuttingDown;
self.state_mut().peers_mut().on_shutdown();
}
/// Checks if the node's network connection state is 'ShuttingDown'
#[inline]
pub(crate) fn is_shutting_down(&self) -> bool {
matches!(self.net_connection_state, NetworkConnectionState::ShuttingDown)
self.state().peers().connection_state().is_shutting_down()
}
/// Set network connection state to `Hibernate` or `Active`
pub(crate) fn on_network_state_change(&mut self, network_state: NetworkConnectionState) {
self.state_mut().peers_mut().on_network_state_change(network_state);
}
}
@@ -423,9 +425,27 @@ pub(crate) enum SwarmEvent {
/// Represents the state of the connection of the node. If shutting down,
/// new connections won't be established.
/// When in hibernation mode, the node will not initiate new outbound connections. This is
/// beneficial for sync stages that do not require a network connection.
#[derive(Debug, Default)]
pub(crate) enum NetworkConnectionState {
pub enum NetworkConnectionState {
/// Node is active, new outbound connections will be established.
#[default]
Active,
/// Node is shutting down, no new outbound connections will be established.
ShuttingDown,
/// Hibernate Network connection, no new outbound connections will be established.
Hibernate,
}
impl NetworkConnectionState {
/// Returns true if the node is active.
pub(crate) fn is_active(&self) -> bool {
matches!(self, NetworkConnectionState::Active)
}
/// Returns true if the node is shutting down.
pub(crate) fn is_shutting_down(&self) -> bool {
matches!(self, NetworkConnectionState::ShuttingDown)
}
}