Performance metrics for NetworkManager future (#6746)

This commit is contained in:
Emilia Hane
2024-02-29 03:55:09 +01:00
committed by GitHub
parent 859406666e
commit 559124ac5a
3 changed files with 334 additions and 271 deletions

View File

@@ -56,6 +56,7 @@ use std::{
Arc,
},
task::{Context, Poll},
time::{Duration, Instant},
};
use tokio::sync::mpsc::{self, error::TrySendError};
use tokio_stream::wrappers::UnboundedReceiverStream;
@@ -147,6 +148,19 @@ impl<C> NetworkManager<C> {
pub fn secret_key(&self) -> SecretKey {
self.swarm.sessions().secret_key()
}
#[inline]
fn update_poll_metrics(&self, start: Instant, poll_durations: NetworkManagerPollDurations) {
let metrics = &self.metrics;
let NetworkManagerPollDurations { acc_network_handle, acc_swarm } = poll_durations;
// update metrics for whole poll function
metrics.duration_poll_network_manager.set(start.elapsed().as_secs_f64());
// update poll metrics for nested items
metrics.duration_poll_network_handle.set(acc_network_handle.as_secs_f64());
metrics.duration_poll_swarm.set(acc_swarm.as_secs_f64());
}
}
impl<C> NetworkManager<C>
@@ -602,6 +616,249 @@ where
}
}
}
fn on_swarm_event(&mut self, event: SwarmEvent) {
// handle event
match event {
SwarmEvent::ValidMessage { peer_id, message } => self.on_peer_message(peer_id, message),
SwarmEvent::InvalidCapabilityMessage { peer_id, capabilities, message } => {
self.on_invalid_message(peer_id, capabilities, message);
self.metrics.invalid_messages_received.increment(1);
}
SwarmEvent::TcpListenerClosed { remote_addr } => {
trace!(target: "net", ?remote_addr, "TCP listener closed.");
}
SwarmEvent::TcpListenerError(err) => {
trace!(target: "net", %err, "TCP connection error.");
}
SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => {
trace!(target: "net", ?session_id, ?remote_addr, "Incoming connection");
self.metrics.total_incoming_connections.increment(1);
self.metrics
.incoming_connections
.set(self.swarm.state().peers().num_inbound_connections() as f64);
}
SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => {
trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection.");
self.metrics.total_outgoing_connections.increment(1);
self.metrics
.outgoing_connections
.set(self.swarm.state().peers().num_outbound_connections() as f64);
}
SwarmEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
version,
messages,
status,
direction,
} => {
let total_active = self.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1;
self.metrics.connected_peers.set(total_active as f64);
trace!(
target: "net",
?remote_addr,
%client_version,
?peer_id,
?total_active,
kind=%direction,
peer_enode=%NodeRecord::new(remote_addr, peer_id),
"Session established"
);
if direction.is_incoming() {
self.swarm
.state_mut()
.peers_mut()
.on_incoming_session_established(peer_id, remote_addr);
}
self.event_listeners.notify(NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
version,
status,
messages,
});
}
SwarmEvent::PeerAdded(peer_id) => {
trace!(target: "net", ?peer_id, "Peer added");
self.event_listeners.notify(NetworkEvent::PeerAdded(peer_id));
self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
}
SwarmEvent::PeerRemoved(peer_id) => {
trace!(target: "net", ?peer_id, "Peer dropped");
self.event_listeners.notify(NetworkEvent::PeerRemoved(peer_id));
self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
}
SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
let total_active = self.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
self.metrics.connected_peers.set(total_active as f64);
trace!(
target: "net",
?remote_addr,
?peer_id,
?total_active,
?error,
"Session disconnected"
);
let mut reason = None;
if let Some(ref err) = error {
// If the connection was closed due to an error, we report
// the peer
self.swarm.state_mut().peers_mut().on_active_session_dropped(
&remote_addr,
&peer_id,
err,
);
reason = err.as_disconnected();
} else {
// Gracefully disconnected
self.swarm.state_mut().peers_mut().on_active_session_gracefully_closed(peer_id);
}
self.metrics.closed_sessions.increment(1);
// This can either be an incoming or outgoing connection which
// was closed. So we update
// both metrics
self.metrics
.incoming_connections
.set(self.swarm.state().peers().num_inbound_connections() as f64);
self.metrics
.outgoing_connections
.set(self.swarm.state().peers().num_outbound_connections() as f64);
if let Some(reason) = reason {
self.disconnect_metrics.increment(reason);
}
self.metrics.backed_off_peers.set(
self.swarm
.state()
.peers()
.num_backed_off_peers()
.saturating_sub(1)
as f64,
);
self.event_listeners.notify(NetworkEvent::SessionClosed { peer_id, reason });
}
SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => {
trace!(
target: "net",
?remote_addr,
?error,
"Incoming pending session failed"
);
if let Some(ref err) = error {
self.swarm
.state_mut()
.peers_mut()
.on_incoming_pending_session_dropped(remote_addr, err);
self.metrics.pending_session_failures.increment(1);
if let Some(reason) = err.as_disconnected() {
self.disconnect_metrics.increment(reason);
}
} else {
self.swarm
.state_mut()
.peers_mut()
.on_incoming_pending_session_gracefully_closed();
}
self.metrics.closed_sessions.increment(1);
self.metrics
.incoming_connections
.set(self.swarm.state().peers().num_inbound_connections() as f64);
self.metrics.backed_off_peers.set(
self.swarm
.state()
.peers()
.num_backed_off_peers()
.saturating_sub(1)
as f64,
);
}
SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
trace!(
target: "net",
?remote_addr,
?peer_id,
?error,
"Outgoing pending session failed"
);
if let Some(ref err) = error {
self.swarm.state_mut().peers_mut().on_pending_session_dropped(
&remote_addr,
&peer_id,
err,
);
self.metrics.pending_session_failures.increment(1);
if let Some(reason) = err.as_disconnected() {
self.disconnect_metrics.increment(reason);
}
} else {
self.swarm
.state_mut()
.peers_mut()
.on_pending_session_gracefully_closed(&peer_id);
}
self.metrics.closed_sessions.increment(1);
self.metrics
.outgoing_connections
.set(self.swarm.state().peers().num_outbound_connections() as f64);
self.metrics.backed_off_peers.set(
self.swarm
.state()
.peers()
.num_backed_off_peers()
.saturating_sub(1)
as f64,
);
}
SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
trace!(
target: "net",
?remote_addr,
?peer_id,
%error,
"Outgoing connection error"
);
self.swarm.state_mut().peers_mut().on_outgoing_connection_failure(
&remote_addr,
&peer_id,
&error,
);
self.metrics
.outgoing_connections
.set(self.swarm.state().peers().num_outbound_connections() as f64);
self.metrics.backed_off_peers.set(
self.swarm
.state()
.peers()
.num_backed_off_peers()
.saturating_sub(1)
as f64,
);
}
SwarmEvent::BadMessage { peer_id } => {
self.swarm
.state_mut()
.peers_mut()
.apply_reputation_change(&peer_id, ReputationChangeKind::BadMessage);
self.metrics.invalid_messages_received.increment(1);
}
SwarmEvent::ProtocolBreach { peer_id } => {
self.swarm
.state_mut()
.peers_mut()
.apply_reputation_change(&peer_id, ReputationChangeKind::BadProtocol);
}
}
}
}
impl<C> NetworkManager<C>
@@ -639,20 +896,25 @@ where
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let start = Instant::now();
let mut poll_durations = NetworkManagerPollDurations::default();
let this = self.get_mut();
// poll new block imports
// poll new block imports (expected to be a noop for POS)
while let Poll::Ready(outcome) = this.block_import.poll(cx) {
this.on_block_import_result(outcome);
}
// process incoming messages from a handle
let start_network_handle = Instant::now();
loop {
match this.from_handle_rx.poll_next_unpin(cx) {
Poll::Pending => break,
Poll::Ready(None) => {
// This is only possible if the channel was deliberately closed since we always
// have an instance of `NetworkHandle`
// This is only possible if the channel was deliberately closed since we
// always have an instance of
// `NetworkHandle`
error!("Network message channel closed.");
return Poll::Ready(())
}
@@ -660,22 +922,25 @@ where
};
}
// This loop drives the entire state of network and does a lot of work.
// Under heavy load (many messages/events), data may arrive faster than it can be processed
// (incoming messages/requests -> events), and it is possible that more data has already
// arrived by the time an internal event is processed. Which could turn this loop into a
// busy loop. Without yielding back to the executor, it can starve other tasks waiting on
// that executor to execute them, or drive underlying resources To prevent this, we
// preemptively return control when the `budget` is exhausted. The value itself is
// chosen somewhat arbitrarily, it is high enough so the swarm can make meaningful progress
// but low enough that this loop does not starve other tasks for too long.
// If the budget is exhausted we manually yield back control to the (coop) scheduler. This
// manual yield point should prevent situations where polling appears to be frozen. See also <https://tokio.rs/blog/2020-04-preemption>
// And tokio's docs on cooperative scheduling <https://docs.rs/tokio/latest/tokio/task/#cooperative-scheduling>
poll_durations.acc_network_handle = start_network_handle.elapsed();
// This loop drives the entire state of network and does a lot of work. Under heavy load
// (many messages/events), data may arrive faster than it can be processed (incoming
// messages/requests -> events), and it is possible that more data has already arrived by
// the time an internal event is processed. Which could turn this loop into a busy loop.
// Without yielding back to the executor, it can starve other tasks waiting on that
// executor to execute them, or drive underlying resources To prevent this, we
// preemptively return control when the `budget` is exhausted. The value itself is chosen
// somewhat arbitrarily, it is high enough so the swarm can make meaningful progress but
// low enough that this loop does not starve other tasks for too long. If the budget is
// exhausted we manually yield back control to the (coop) scheduler. This manual yield
// point should prevent situations where polling appears to be frozen. See also
// <https://tokio.rs/blog/2020-04-preemption> And tokio's docs on cooperative scheduling
// <https://docs.rs/tokio/latest/tokio/task/#cooperative-scheduling>
//
// Testing has shown that this loop naturally reaches the pending state within 1-5
// iterations in << 100µs in most cases. On average it requires ~50µs, which is inside
// the range of what's recommended as rule of thumb.
// iterations in << 100µs in most cases. On average it requires ~50µs, which is inside the
// range of what's recommended as rule of thumb.
// <https://ryhl.io/blog/async-what-is-blocking/>
let mut budget = 10;
@@ -683,246 +948,7 @@ where
// advance the swarm
match this.swarm.poll_next_unpin(cx) {
Poll::Pending | Poll::Ready(None) => break,
Poll::Ready(Some(event)) => {
// handle event
match event {
SwarmEvent::ValidMessage { peer_id, message } => {
this.on_peer_message(peer_id, message)
}
SwarmEvent::InvalidCapabilityMessage { peer_id, capabilities, message } => {
this.on_invalid_message(peer_id, capabilities, message);
this.metrics.invalid_messages_received.increment(1);
}
SwarmEvent::TcpListenerClosed { remote_addr } => {
trace!(target: "net", ?remote_addr, "TCP listener closed.");
}
SwarmEvent::TcpListenerError(err) => {
trace!(target: "net", %err, "TCP connection error.");
}
SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => {
trace!(target: "net", ?session_id, ?remote_addr, "Incoming connection");
this.metrics.total_incoming_connections.increment(1);
this.metrics
.incoming_connections
.set(this.swarm.state().peers().num_inbound_connections() as f64);
}
SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => {
trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection.");
this.metrics.total_outgoing_connections.increment(1);
this.metrics
.outgoing_connections
.set(this.swarm.state().peers().num_outbound_connections() as f64);
}
SwarmEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
version,
messages,
status,
direction,
} => {
let total_active =
this.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1;
this.metrics.connected_peers.set(total_active as f64);
trace!(
target: "net",
?remote_addr,
%client_version,
?peer_id,
?total_active,
kind=%direction,
peer_enode=%NodeRecord::new(remote_addr, peer_id),
"Session established"
);
if direction.is_incoming() {
this.swarm
.state_mut()
.peers_mut()
.on_incoming_session_established(peer_id, remote_addr);
}
this.event_listeners.notify(NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
version,
status,
messages,
});
}
SwarmEvent::PeerAdded(peer_id) => {
trace!(target: "net", ?peer_id, "Peer added");
this.event_listeners.notify(NetworkEvent::PeerAdded(peer_id));
this.metrics
.tracked_peers
.set(this.swarm.state().peers().num_known_peers() as f64);
}
SwarmEvent::PeerRemoved(peer_id) => {
trace!(target: "net", ?peer_id, "Peer dropped");
this.event_listeners.notify(NetworkEvent::PeerRemoved(peer_id));
this.metrics
.tracked_peers
.set(this.swarm.state().peers().num_known_peers() as f64);
}
SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
let total_active =
this.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
this.metrics.connected_peers.set(total_active as f64);
trace!(
target: "net",
?remote_addr,
?peer_id,
?total_active,
?error,
"Session disconnected"
);
let mut reason = None;
if let Some(ref err) = error {
// If the connection was closed due to an error, we report the peer
this.swarm.state_mut().peers_mut().on_active_session_dropped(
&remote_addr,
&peer_id,
err,
);
reason = err.as_disconnected();
} else {
// Gracefully disconnected
this.swarm
.state_mut()
.peers_mut()
.on_active_session_gracefully_closed(peer_id);
}
this.metrics.closed_sessions.increment(1);
// This can either be an incoming or outgoing connection which was
// closed. So we update both metrics
this.metrics
.incoming_connections
.set(this.swarm.state().peers().num_inbound_connections() as f64);
this.metrics
.outgoing_connections
.set(this.swarm.state().peers().num_outbound_connections() as f64);
if let Some(reason) = reason {
this.disconnect_metrics.increment(reason);
}
this.metrics.backed_off_peers.set(
this.swarm.state().peers().num_backed_off_peers().saturating_sub(1)
as f64,
);
this.event_listeners
.notify(NetworkEvent::SessionClosed { peer_id, reason });
}
SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => {
trace!(
target: "net",
?remote_addr,
?error,
"Incoming pending session failed"
);
if let Some(ref err) = error {
this.swarm
.state_mut()
.peers_mut()
.on_incoming_pending_session_dropped(remote_addr, err);
this.metrics.pending_session_failures.increment(1);
if let Some(reason) = err.as_disconnected() {
this.disconnect_metrics.increment(reason);
}
} else {
this.swarm
.state_mut()
.peers_mut()
.on_incoming_pending_session_gracefully_closed();
}
this.metrics.closed_sessions.increment(1);
this.metrics
.incoming_connections
.set(this.swarm.state().peers().num_inbound_connections() as f64);
this.metrics.backed_off_peers.set(
this.swarm.state().peers().num_backed_off_peers().saturating_sub(1)
as f64,
);
}
SwarmEvent::OutgoingPendingSessionClosed {
remote_addr,
peer_id,
error,
} => {
trace!(
target: "net",
?remote_addr,
?peer_id,
?error,
"Outgoing pending session failed"
);
if let Some(ref err) = error {
this.swarm.state_mut().peers_mut().on_pending_session_dropped(
&remote_addr,
&peer_id,
err,
);
this.metrics.pending_session_failures.increment(1);
if let Some(reason) = err.as_disconnected() {
this.disconnect_metrics.increment(reason);
}
} else {
this.swarm
.state_mut()
.peers_mut()
.on_pending_session_gracefully_closed(&peer_id);
}
this.metrics.closed_sessions.increment(1);
this.metrics
.outgoing_connections
.set(this.swarm.state().peers().num_outbound_connections() as f64);
this.metrics.backed_off_peers.set(
this.swarm.state().peers().num_backed_off_peers().saturating_sub(1)
as f64,
);
}
SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
trace!(
target: "net",
?remote_addr,
?peer_id,
%error,
"Outgoing connection error"
);
this.swarm.state_mut().peers_mut().on_outgoing_connection_failure(
&remote_addr,
&peer_id,
&error,
);
this.metrics
.outgoing_connections
.set(this.swarm.state().peers().num_outbound_connections() as f64);
this.metrics.backed_off_peers.set(
this.swarm.state().peers().num_backed_off_peers().saturating_sub(1)
as f64,
);
}
SwarmEvent::BadMessage { peer_id } => {
this.swarm.state_mut().peers_mut().apply_reputation_change(
&peer_id,
ReputationChangeKind::BadMessage,
);
this.metrics.invalid_messages_received.increment(1);
}
SwarmEvent::ProtocolBreach { peer_id } => {
this.swarm.state_mut().peers_mut().apply_reputation_change(
&peer_id,
ReputationChangeKind::BadProtocol,
);
}
}
}
Poll::Ready(Some(event)) => this.on_swarm_event(event),
}
// ensure we still have enough budget for another iteration
@@ -935,6 +961,11 @@ where
}
}
poll_durations.acc_swarm =
start_network_handle.elapsed() - poll_durations.acc_network_handle;
this.update_poll_metrics(start, poll_durations);
Poll::Pending
}
}
@@ -979,3 +1010,9 @@ pub enum NetworkEvent {
pub enum DiscoveredEvent {
EventQueued { peer_id: PeerId, socket_addr: SocketAddr, fork_id: Option<ForkId> },
}
#[derive(Debug, Default)]
struct NetworkManagerPollDurations {
acc_network_handle: Duration,
acc_swarm: Duration,
}

View File

@@ -45,6 +45,30 @@ pub struct NetworkMetrics {
/// Number of Eth Requests dropped due to channel being at full capacity
pub(crate) total_dropped_eth_requests_at_full_capacity: Counter,
/* ================ POLL DURATION ================ */
/* -- Total poll duration of `NetworksManager` future -- */
/// Duration in seconds of call to
/// [`NetworkManager`](crate::NetworkManager)'s poll function.
///
/// True duration of this call, should be sum of the accumulated durations of calling nested
// items.
pub(crate) duration_poll_network_manager: Gauge,
/* -- Poll duration of items nested in `NetworkManager` future -- */
/// Time spent streaming messages sent over the [`NetworkHandle`](crate::NetworkHandle), which
/// can be cloned and shared via [`NetworkManager::handle`](crate::NetworkManager::handle), in
/// one call to poll the [`NetworkManager`](crate::NetworkManager) future.
///
/// Duration in seconds.
// todo: find out how many components hold the network handle.
pub(crate) duration_poll_network_handle: Gauge,
/// Time spent polling [`Swarm`](crate::swarm::Swarm), in one call to poll the
/// [`NetworkManager`](crate::NetworkManager) future.
///
/// Duration in seconds.
pub(crate) duration_poll_swarm: Gauge,
}
/// Metrics for SessionManager
@@ -119,7 +143,7 @@ pub struct TransactionsManagerMetrics {
/// [`TransactionsManager`](crate::transactions::TransactionsManager)'s poll function.
///
/// Updating metrics could take time, so the true duration of this call could
/// be longer than the sum of the accumulated durations of polling nested streams.
/// be longer than the sum of the accumulated durations of polling nested items.
pub(crate) duration_poll_tx_manager: Gauge,
/* -- Poll duration of items nested in `TransactionsManager` future -- */
@@ -167,6 +191,19 @@ pub struct TransactionsManagerMetrics {
pub(crate) acc_duration_poll_commands: Gauge,
}
/// Measures the duration of executing the given code block. The duration is added to the given
/// accumulator value passed as a mutable reference.
#[macro_export]
macro_rules! duration_metered_exec {
($code:block, $acc:ident) => {
let start = Instant::now();
$code;
*$acc += start.elapsed();
};
}
/// Metrics for Disconnection types
///
/// These are just counters, and ideally we would implement these metrics on a peer-by-peer basis,

View File

@@ -29,6 +29,7 @@
use crate::{
cache::LruCache,
duration_metered_exec,
manager::NetworkEvent,
message::{PeerRequest, PeerRequestSender},
metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
@@ -369,7 +370,7 @@ where
// update metrics for whole poll function
metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
// update poll metrics for nested streams
// update metrics for nested expressions
metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
@@ -1163,18 +1164,6 @@ where
}
}
/// Measures the duration of executing the given code block. The duration is added to the given
/// accumulator value passed as a mutable reference.
macro_rules! duration_metered_exec {
($code:block, $acc:ident) => {
let start = Instant::now();
$code;
*$acc += start.elapsed();
};
}
#[derive(Debug, Default)]
struct TxManagerPollDurations {
acc_network_events: Duration,